* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_OSC
-#include <libcfs/libcfs.h>
-
-#include <lustre/lustre_user.h>
-
+#include <linux/workqueue.h>
#include <lprocfs_status.h>
#include <lustre_debug.h>
#include <lustre_dlm.h>
#include <lustre_fid.h>
#include <lustre_ha.h>
-#include <lustre_ioctl.h>
+#include <uapi/linux/lustre/lustre_ioctl.h>
#include <lustre_net.h>
#include <lustre_obdo.h>
-#include <lustre_param.h>
#include <obd.h>
#include <obd_cksum.h>
#include <obd_class.h>
+#include <lustre_osc.h>
-#include "osc_cl_internal.h"
#include "osc_internal.h"
atomic_t osc_pool_req_count;
static unsigned int osc_reqpool_mem_max = 5;
module_param(osc_reqpool_mem_max, uint, 0444);
-struct osc_brw_async_args {
- struct obdo *aa_oa;
- int aa_requested_nob;
- int aa_nio_count;
- u32 aa_page_count;
- int aa_resends;
- struct brw_page **aa_ppga;
- struct client_obd *aa_cli;
- struct list_head aa_oaps;
- struct list_head aa_exts;
-};
+static int osc_idle_timeout = 20;
+module_param(osc_idle_timeout, uint, 0644);
#define osc_grant_args osc_brw_async_args
void *la_cookie;
};
-struct osc_enqueue_args {
- struct obd_export *oa_exp;
- enum ldlm_type oa_type;
- enum ldlm_mode oa_mode;
- __u64 *oa_flags;
- osc_enqueue_upcall_f oa_upcall;
- void *oa_cookie;
- struct ost_lvb *oa_lvb;
- struct lustre_handle oa_lockh;
- unsigned int oa_agl:1;
-};
-
static void osc_release_ppga(struct brw_page **ppga, size_t count);
static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
void *data, int rc);
}
static int osc_setattr_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- struct osc_setattr_args *sa, int rc)
+ struct ptlrpc_request *req, void *args, int rc)
{
- struct ost_body *body;
- ENTRY;
+ struct osc_setattr_args *sa = args;
+ struct ost_body *body;
- if (rc != 0)
- GOTO(out, rc);
+ ENTRY;
- body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- GOTO(out, rc = -EPROTO);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ GOTO(out, rc = -EPROTO);
lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
&body->oa);
out:
- rc = sa->sa_upcall(sa->sa_cookie, rc);
- RETURN(rc);
+ rc = sa->sa_upcall(sa->sa_cookie, rc);
+ RETURN(rc);
}
int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
/* Do not wait for response. */
ptlrpcd_add_req(req);
} else {
- req->rq_interpret_reply =
- (ptlrpc_interpterer_t)osc_setattr_interpret;
+ req->rq_interpret_reply = osc_setattr_interpret;
- CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
- sa = ptlrpc_req_async_args(req);
+ sa = ptlrpc_req_async_args(sa, req);
sa->sa_oa = oa;
sa->sa_upcall = upcall;
sa->sa_cookie = cookie;
- if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req);
- else
- ptlrpc_set_add_req(rqset, req);
+ ptlrpc_set_add_req(rqset, req);
}
RETURN(0);
}
req->rq_interpret_reply = osc_ladvise_interpret;
- CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
- la = ptlrpc_req_async_args(req);
+ la = ptlrpc_req_async_args(la, req);
la->la_oa = oa;
la->la_upcall = upcall;
la->la_cookie = cookie;
- if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req);
- else
- ptlrpc_set_add_req(rqset, req);
+ ptlrpc_set_add_req(rqset, req);
RETURN(0);
}
RETURN(rc);
}
-int osc_punch_base(struct obd_export *exp, struct obdo *oa,
- obd_enqueue_update_f upcall, void *cookie,
- struct ptlrpc_request_set *rqset)
+int osc_punch_send(struct obd_export *exp, struct obdo *oa,
+ obd_enqueue_update_f upcall, void *cookie)
{
- struct ptlrpc_request *req;
- struct osc_setattr_args *sa;
- struct ost_body *body;
- int rc;
- ENTRY;
+ struct ptlrpc_request *req;
+ struct osc_setattr_args *sa;
+ struct obd_import *imp = class_exp2cliimp(exp);
+ struct ost_body *body;
+ int rc;
- req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
- if (req == NULL)
- RETURN(-ENOMEM);
+ ENTRY;
- rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
- req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
- ptlrpc_at_set_req_timeout(req);
+ req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
+ if (rc < 0) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+
+ osc_set_io_portal(req);
+
+ ptlrpc_at_set_req_timeout(req);
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body);
- lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+
+ lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
ptlrpc_request_set_replen(req);
- req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
- CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
- sa = ptlrpc_req_async_args(req);
+ req->rq_interpret_reply = osc_setattr_interpret;
+ sa = ptlrpc_req_async_args(sa, req);
sa->sa_oa = oa;
sa->sa_upcall = upcall;
sa->sa_cookie = cookie;
- if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req);
- else
- ptlrpc_set_add_req(rqset, req);
+
+ ptlrpcd_add_req(req);
RETURN(0);
}
+EXPORT_SYMBOL(osc_punch_send);
static int osc_sync_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- void *arg, int rc)
+ struct ptlrpc_request *req, void *args, int rc)
{
- struct osc_fsync_args *fa = arg;
- struct ost_body *body;
- struct cl_attr *attr = &osc_env_info(env)->oti_attr;
- unsigned long valid = 0;
- struct cl_object *obj;
+ struct osc_fsync_args *fa = args;
+ struct ost_body *body;
+ struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+ unsigned long valid = 0;
+ struct cl_object *obj;
ENTRY;
if (rc != 0)
ptlrpc_request_set_replen(req);
req->rq_interpret_reply = osc_sync_interpret;
- CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
- fa = ptlrpc_req_async_args(req);
+ fa = ptlrpc_req_async_args(fa, req);
fa->fa_obj = obj;
fa->fa_oa = oa;
fa->fa_upcall = upcall;
fa->fa_cookie = cookie;
- if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req);
- else
- ptlrpc_set_add_req(rqset, req);
+ ptlrpc_set_add_req(rqset, req);
RETURN (0);
}
}
static int osc_destroy_interpret(const struct lu_env *env,
- struct ptlrpc_request *req, void *data,
- int rc)
+ struct ptlrpc_request *req, void *args, int rc)
{
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
atomic_dec(&cli->cl_destroy_in_flight);
wake_up(&cli->cl_destroy_waitq);
+
return 0;
}
struct client_obd *cli = &exp->exp_obd->u.cli;
struct ptlrpc_request *req;
struct ost_body *body;
- struct list_head cancels = LIST_HEAD_INIT(cancels);
+ LIST_HEAD(cancels);
int rc, count;
ENTRY;
req->rq_interpret_reply = osc_destroy_interpret;
if (!osc_can_send_destroy(cli)) {
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-
/*
* Wait until the number of on-going destroy RPCs drops
* under max_rpc_in_flight
*/
- l_wait_event_exclusive(cli->cl_destroy_waitq,
- osc_can_send_destroy(cli), &lwi);
+ rc = l_wait_event_abortable_exclusive(
+ cli->cl_destroy_waitq,
+ osc_can_send_destroy(cli));
+ if (rc) {
+ ptlrpc_req_finished(req);
+ RETURN(-EINTR);
+ }
}
/* Do not wait for response */
oa->o_dirty = cli->cl_dirty_grant;
else
oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
- if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
- cli->cl_dirty_max_pages)) {
- CERROR("dirty %lu - %lu > dirty_max %lu\n",
- cli->cl_dirty_pages, cli->cl_dirty_transit,
+ if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
+ CERROR("dirty %lu > dirty_max %lu\n",
+ cli->cl_dirty_pages,
cli->cl_dirty_max_pages);
oa->o_undirty = 0;
- } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
- atomic_long_read(&obd_dirty_transit_pages) >
+ } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
(long)(obd_max_dirty_pages + 1))) {
/* The atomic_read() allowing the atomic_inc() are
* not covered by a lock thus they may safely race and trip
* this CERROR() unless we add in a small fudge factor (+1). */
- CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
+ CERROR("%s: dirty %ld > system dirty_max %ld\n",
cli_name(cli), atomic_long_read(&obd_dirty_pages),
- atomic_long_read(&obd_dirty_transit_pages),
obd_max_dirty_pages);
oa->o_undirty = 0;
} else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
oa->o_undirty = 0;
} else {
unsigned long nrpages;
+ unsigned long undirty;
nrpages = cli->cl_max_pages_per_rpc;
nrpages *= cli->cl_max_rpcs_in_flight + 1;
nrpages = max(nrpages, cli->cl_dirty_max_pages);
- oa->o_undirty = nrpages << PAGE_SHIFT;
+ undirty = nrpages << PAGE_SHIFT;
if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
GRANT_PARAM)) {
int nrextents;
* grant space */
nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
cli->cl_max_extent_pages;
- oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
+ undirty += nrextents * cli->cl_grant_extent_tax;
}
+ /* Do not ask for more than OBD_MAX_GRANT - a margin for server
+ * to add extent tax, etc.
+ */
+ oa->o_undirty = min(undirty, OBD_MAX_GRANT &
+ ~(PTLRPC_MAX_BRW_SIZE * 4UL));
}
oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
oa->o_dropped = cli->cl_lost_grant;
void osc_update_next_shrink(struct client_obd *cli)
{
- cli->cl_next_shrink_grant =
- cfs_time_shift(cli->cl_grant_shrink_interval);
- CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
- cli->cl_next_shrink_grant);
+ cli->cl_next_shrink_grant = ktime_get_seconds() +
+ cli->cl_grant_shrink_interval;
+
+ CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
+ cli->cl_next_shrink_grant);
}
static void __osc_update_grant(struct client_obd *cli, u64 grant)
}
}
-static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
- u32 keylen, void *key,
- u32 vallen, void *val,
- struct ptlrpc_request_set *set);
+/**
+ * grant thread data for shrinking space.
+ */
+struct grant_thread_data {
+ struct list_head gtd_clients;
+ struct mutex gtd_mutex;
+ unsigned long gtd_stopped:1;
+};
+static struct grant_thread_data client_gtd;
static int osc_shrink_grant_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- void *aa, int rc)
+ struct ptlrpc_request *req,
+ void *args, int rc)
{
- struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
- struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
- struct ost_body *body;
+ struct osc_grant_args *aa = args;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ struct ost_body *body;
- if (rc != 0) {
- __osc_update_grant(cli, oa->o_grant);
- GOTO(out, rc);
- }
+ if (rc != 0) {
+ __osc_update_grant(cli, aa->aa_oa->o_grant);
+ GOTO(out, rc);
+ }
- body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body);
- osc_update_grant(cli, body);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ LASSERT(body);
+ osc_update_grant(cli, body);
out:
- OBDO_FREE(oa);
- return rc;
+ OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
+ aa->aa_oa = NULL;
+
+ return rc;
}
static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
osc_announce_cached(cli, &body->oa, 0);
spin_lock(&cli->cl_loi_list_lock);
+ if (target_bytes >= cli->cl_avail_grant) {
+ /* available grant has changed since target calculation */
+ spin_unlock(&cli->cl_loi_list_lock);
+ GOTO(out_free, rc = 0);
+ }
body->oa.o_grant = cli->cl_avail_grant - target_bytes;
cli->cl_avail_grant = target_bytes;
spin_unlock(&cli->cl_loi_list_lock);
sizeof(*body), body, NULL);
if (rc != 0)
__osc_update_grant(cli, body->oa.o_grant);
+out_free:
OBD_FREE_PTR(body);
RETURN(rc);
}
static int osc_should_shrink_grant(struct client_obd *client)
{
- cfs_time_t time = cfs_time_current();
- cfs_time_t next_shrink = client->cl_next_shrink_grant;
+ time64_t next_shrink = client->cl_next_shrink_grant;
- if ((client->cl_import->imp_connect_data.ocd_connect_flags &
- OBD_CONNECT_GRANT_SHRINK) == 0)
- return 0;
+ if (client->cl_import == NULL)
+ return 0;
+
+ if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
+ client->cl_import->imp_grant_shrink_disabled) {
+ osc_update_next_shrink(client);
+ return 0;
+ }
- if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
+ if (ktime_get_seconds() >= next_shrink - 5) {
/* Get the current RPC size directly, instead of going via:
* cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
* Keep comment here so that it can be found by searching. */
return 0;
}
-static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
+#define GRANT_SHRINK_RPC_BATCH 100
+
+static struct delayed_work work;
+
+static void osc_grant_work_handler(struct work_struct *data)
{
- struct client_obd *client;
+ struct client_obd *cli;
+ int rpc_sent;
+ bool init_next_shrink = true;
+ time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
+
+ rpc_sent = 0;
+ mutex_lock(&client_gtd.gtd_mutex);
+ list_for_each_entry(cli, &client_gtd.gtd_clients,
+ cl_grant_chain) {
+ if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
+ osc_should_shrink_grant(cli)) {
+ osc_shrink_grant(cli);
+ rpc_sent++;
+ }
- list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
- if (osc_should_shrink_grant(client))
- osc_shrink_grant(client);
+ if (!init_next_shrink) {
+ if (cli->cl_next_shrink_grant < next_shrink &&
+ cli->cl_next_shrink_grant > ktime_get_seconds())
+ next_shrink = cli->cl_next_shrink_grant;
+ } else {
+ init_next_shrink = false;
+ next_shrink = cli->cl_next_shrink_grant;
+ }
+ }
+ mutex_unlock(&client_gtd.gtd_mutex);
+
+ if (client_gtd.gtd_stopped == 1)
+ return;
+
+ if (next_shrink > ktime_get_seconds()) {
+ time64_t delay = next_shrink - ktime_get_seconds();
+
+ schedule_delayed_work(&work, cfs_time_seconds(delay));
+ } else {
+ schedule_work(&work.work);
}
- return 0;
}
-static int osc_add_shrink_grant(struct client_obd *client)
+void osc_schedule_grant_work(void)
{
- int rc;
+ cancel_delayed_work_sync(&work);
+ schedule_work(&work.work);
+}
+
+/**
+ * Start grant thread for returing grant to server for idle clients.
+ */
+static int osc_start_grant_work(void)
+{
+ client_gtd.gtd_stopped = 0;
+ mutex_init(&client_gtd.gtd_mutex);
+ INIT_LIST_HEAD(&client_gtd.gtd_clients);
+
+ INIT_DELAYED_WORK(&work, osc_grant_work_handler);
+ schedule_work(&work.work);
- rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
- TIMEOUT_GRANT,
- osc_grant_shrink_grant_cb, NULL,
- &client->cl_grant_shrink_list);
- if (rc) {
- CERROR("add grant client %s error %d\n", cli_name(client), rc);
- return rc;
- }
- CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
- osc_update_next_shrink(client);
return 0;
}
-static int osc_del_shrink_grant(struct client_obd *client)
+static void osc_stop_grant_work(void)
+{
+ client_gtd.gtd_stopped = 1;
+ cancel_delayed_work_sync(&work);
+}
+
+static void osc_add_grant_list(struct client_obd *client)
{
- return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
- TIMEOUT_GRANT);
+ mutex_lock(&client_gtd.gtd_mutex);
+ list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
+ mutex_unlock(&client_gtd.gtd_mutex);
}
-static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+static void osc_del_grant_list(struct client_obd *client)
+{
+ if (list_empty(&client->cl_grant_chain))
+ return;
+
+ mutex_lock(&client_gtd.gtd_mutex);
+ list_del_init(&client->cl_grant_chain);
+ mutex_unlock(&client_gtd.gtd_mutex);
+}
+
+void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
{
/*
* ocd_grant is the total grant amount we're expect to hold: if we've
cli->cl_dirty_pages << PAGE_SHIFT;
}
- if (cli->cl_avail_grant < 0) {
- CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
- cli_name(cli), cli->cl_avail_grant,
- ocd->ocd_grant, cli->cl_dirty_pages << PAGE_SHIFT);
- /* workaround for servers which do not have the patch from
- * LU-2679 */
- cli->cl_avail_grant = ocd->ocd_grant;
- }
-
if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
u64 size;
int chunk_mask;
}
spin_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
- "chunk bits: %d cl_max_extent_pages: %d\n",
- cli_name(cli),
- cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
- cli->cl_max_extent_pages);
+ CDEBUG(D_CACHE,
+ "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
+ cli_name(cli),
+ cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
+ cli->cl_max_extent_pages);
- if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
- list_empty(&cli->cl_grant_shrink_list))
- osc_add_shrink_grant(cli);
+ if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
+ osc_add_grant_list(cli);
}
+EXPORT_SYMBOL(osc_init_grant);
/* We assume that the reason this OSC got a short read is because it read
* beyond the end of a stripe file; i.e. lustre is reading a sparse file
/* return error if any niobuf was in error */
for (i = 0; i < niocount; i++) {
- if ((int)remote_rcs[i] < 0)
- return(remote_rcs[i]);
+ if ((int)remote_rcs[i] < 0) {
+ CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
+ i, remote_rcs[i], req);
+ return remote_rcs[i];
+ }
if (remote_rcs[i] != 0) {
CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
return(-EPROTO);
}
}
-
- if (req->rq_bulk->bd_nob_transferred != requested_nob) {
+ if (req->rq_bulk != NULL &&
+ req->rq_bulk->bd_nob_transferred != requested_nob) {
CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
req->rq_bulk->bd_nob_transferred, requested_nob);
return(-EPROTO);
* safe to combine */
if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
- "report this at https://jira.hpdd.intel.com/\n",
+ "report this at https://jira.whamcloud.com/\n",
p1->flag, p2->flag);
}
return 0;
return (p1->off + p1->count == p2->off);
}
-static u32 osc_checksum_bulk(int nob, size_t pg_count,
+#if IS_ENABLED(CONFIG_CRC_T10DIF)
+static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
+ size_t pg_count, struct brw_page **pga,
+ int opc, obd_dif_csum_fn *fn,
+ int sector_size,
+ u32 *check_sum)
+{
+ struct ahash_request *req;
+ /* Used Adler as the default checksum type on top of DIF tags */
+ unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
+ struct page *__page;
+ unsigned char *buffer;
+ __u16 *guard_start;
+ unsigned int bufsize;
+ int guard_number;
+ int used_number = 0;
+ int used;
+ u32 cksum;
+ int rc = 0;
+ int i = 0;
+
+ LASSERT(pg_count > 0);
+
+ __page = alloc_page(GFP_KERNEL);
+ if (__page == NULL)
+ return -ENOMEM;
+
+ req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+ if (IS_ERR(req)) {
+ rc = PTR_ERR(req);
+ CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
+ obd_name, cfs_crypto_hash_name(cfs_alg), rc);
+ GOTO(out, rc);
+ }
+
+ buffer = kmap(__page);
+ guard_start = (__u16 *)buffer;
+ guard_number = PAGE_SIZE / sizeof(*guard_start);
+ while (nob > 0 && pg_count > 0) {
+ unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
+
+ /* corrupt the data before we compute the checksum, to
+ * simulate an OST->client data error */
+ if (unlikely(i == 0 && opc == OST_READ &&
+ OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
+ unsigned char *ptr = kmap(pga[i]->pg);
+ int off = pga[i]->off & ~PAGE_MASK;
+
+ memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
+ kunmap(pga[i]->pg);
+ }
+
+ /*
+ * The left guard number should be able to hold checksums of a
+ * whole page
+ */
+ rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
+ pga[i]->off & ~PAGE_MASK,
+ count,
+ guard_start + used_number,
+ guard_number - used_number,
+ &used, sector_size,
+ fn);
+ if (rc)
+ break;
+
+ used_number += used;
+ if (used_number == guard_number) {
+ cfs_crypto_hash_update_page(req, __page, 0,
+ used_number * sizeof(*guard_start));
+ used_number = 0;
+ }
+
+ nob -= pga[i]->count;
+ pg_count--;
+ i++;
+ }
+ kunmap(__page);
+ if (rc)
+ GOTO(out, rc);
+
+ if (used_number != 0)
+ cfs_crypto_hash_update_page(req, __page, 0,
+ used_number * sizeof(*guard_start));
+
+ bufsize = sizeof(cksum);
+ cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
+
+ /* For sending we only compute the wrong checksum instead
+ * of corrupting the data so it is still correct on a redo */
+ if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+ cksum++;
+
+ *check_sum = cksum;
+out:
+ __free_page(__page);
+ return rc;
+}
+#else /* !CONFIG_CRC_T10DIF */
+#define obd_dif_ip_fn NULL
+#define obd_dif_crc_fn NULL
+#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
+ -EOPNOTSUPP
+#endif /* CONFIG_CRC_T10DIF */
+
+static int osc_checksum_bulk(int nob, size_t pg_count,
struct brw_page **pga, int opc,
- cksum_type_t cksum_type)
+ enum cksum_types cksum_type,
+ u32 *cksum)
{
- u32 cksum;
int i = 0;
- struct cfs_crypto_hash_desc *hdesc;
+ struct ahash_request *req;
unsigned int bufsize;
- int err;
unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
LASSERT(pg_count > 0);
- hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
- if (IS_ERR(hdesc)) {
+ req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+ if (IS_ERR(req)) {
CERROR("Unable to initialize checksum hash %s\n",
cfs_crypto_hash_name(cfs_alg));
- return PTR_ERR(hdesc);
+ return PTR_ERR(req);
}
while (nob > 0 && pg_count > 0) {
memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
kunmap(pga[i]->pg);
}
- cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
+ cfs_crypto_hash_update_page(req, pga[i]->pg,
pga[i]->off & ~PAGE_MASK,
count);
LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
i++;
}
- bufsize = sizeof(cksum);
- err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+ bufsize = sizeof(*cksum);
+ cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
/* For sending we only compute the wrong checksum instead
* of corrupting the data so it is still correct on a redo */
if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
- cksum++;
+ (*cksum)++;
- return cksum;
+ return 0;
+}
+
+static int osc_checksum_bulk_rw(const char *obd_name,
+ enum cksum_types cksum_type,
+ int nob, size_t pg_count,
+ struct brw_page **pga, int opc,
+ u32 *check_sum)
+{
+ obd_dif_csum_fn *fn = NULL;
+ int sector_size = 0;
+ int rc;
+
+ ENTRY;
+ obd_t10_cksum2dif(cksum_type, &fn, §or_size);
+
+ if (fn)
+ rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
+ opc, fn, sector_size, check_sum);
+ else
+ rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
+ check_sum);
+
+ RETURN(rc);
}
static int
struct ost_body *body;
struct obd_ioobj *ioobj;
struct niobuf_remote *niobuf;
- int niocount, i, requested_nob, opc, rc;
+ int niocount, i, requested_nob, opc, rc, short_io_size = 0;
struct osc_brw_async_args *aa;
struct req_capsule *pill;
struct brw_page *pg_prev;
+ void *short_io_buf;
+ const char *obd_name = cli->cl_import->imp_obd->obd_name;
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
niocount * sizeof(*niobuf));
+ for (i = 0; i < page_count; i++)
+ short_io_size += pga[i]->count;
+
+ /* Check if read/write is small enough to be a short io. */
+ if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
+ !imp_connect_shortio(cli->cl_import))
+ short_io_size = 0;
+
+ req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
+ opc == OST_READ ? 0 : short_io_size);
+ if (opc == OST_READ)
+ req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
+ short_io_size);
+
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
if (rc) {
ptlrpc_request_free(req);
RETURN(rc);
}
- req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
- ptlrpc_at_set_req_timeout(req);
+ osc_set_io_portal(req);
+
+ ptlrpc_at_set_req_timeout(req);
/* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
* retry logic */
req->rq_no_retry_einprogress = 1;
+ if (short_io_size != 0) {
+ desc = NULL;
+ short_io_buf = NULL;
+ goto no_bulk;
+ }
+
desc = ptlrpc_prep_bulk_imp(req, page_count,
cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
(opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
if (desc == NULL)
GOTO(out, rc = -ENOMEM);
/* NB request now owns desc and will free it when it gets freed */
-
+no_bulk:
body = req_capsule_client_get(pill, &RMF_OST_BODY);
ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+ /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
+ * and from_kgid(), because they are asynchronous. Fortunately, variable
+ * oa contains valid o_uid and o_gid in these two operations.
+ * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
+ * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
+ * other process logic */
+ body->oa.o_uid = oa->o_uid;
+ body->oa.o_gid = oa->o_gid;
+
obdo_to_ioobj(oa, ioobj);
ioobj->ioo_bufcnt = niocount;
/* The high bits of ioo_max_brw tells server _maximum_ number of bulks
* when the RPC is finally sent in ptlrpc_register_bulk(). It sends
* "max - 1" for old client compatibility sending "0", and also so the
* the actual maximum is a power-of-two number, not one less. LU-1431 */
- ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+ if (desc != NULL)
+ ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+ else /* short io */
+ ioobj_max_brw_set(ioobj, 0);
+
+ if (short_io_size != 0) {
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+ body->oa.o_valid |= OBD_MD_FLFLAGS;
+ body->oa.o_flags = 0;
+ }
+ body->oa.o_flags |= OBD_FL_SHORT_IO;
+ CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
+ short_io_size);
+ if (opc == OST_WRITE) {
+ short_io_buf = req_capsule_client_get(pill,
+ &RMF_SHORT_IO);
+ LASSERT(short_io_buf != NULL);
+ }
+ }
+
LASSERT(page_count > 0);
pg_prev = pga[0];
for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
pg_prev->pg->index, pg_prev->off);
LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
(pg->flag & OBD_BRW_SRVLOCK));
-
- desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
- requested_nob += pg->count;
+ if (short_io_size != 0 && opc == OST_WRITE) {
+ unsigned char *ptr = kmap_atomic(pg->pg);
+
+ LASSERT(short_io_size >= requested_nob + pg->count);
+ memcpy(short_io_buf + requested_nob,
+ ptr + poff,
+ pg->count);
+ kunmap_atomic(ptr);
+ } else if (short_io_size == 0) {
+ desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
+ pg->count);
+ }
+ requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) {
niobuf--;
!sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
/* store cl_cksum_type in a local variable since
* it can be changed via lprocfs */
- cksum_type_t cksum_type = cli->cl_cksum_type;
+ enum cksum_types cksum_type = cli->cl_cksum_type;
- if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
- oa->o_flags &= OBD_FL_LOCAL_MASK;
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
body->oa.o_flags = 0;
- }
- body->oa.o_flags |= cksum_type_pack(cksum_type);
+
+ body->oa.o_flags |= obd_cksum_type_pack(obd_name,
+ cksum_type);
body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
- body->oa.o_cksum = osc_checksum_bulk(requested_nob,
- page_count, pga,
- OST_WRITE,
- cksum_type);
+
+ rc = osc_checksum_bulk_rw(obd_name, cksum_type,
+ requested_nob, page_count,
+ pga, OST_WRITE,
+ &body->oa.o_cksum);
+ if (rc < 0) {
+ CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
+ rc);
+ GOTO(out, rc);
+ }
CDEBUG(D_PAGE, "checksum at write origin: %x\n",
body->oa.o_cksum);
+
/* save this in 'oa', too, for later checking */
oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
- oa->o_flags |= cksum_type_pack(cksum_type);
+ oa->o_flags |= obd_cksum_type_pack(obd_name,
+ cksum_type);
} else {
/* clear out the checksum flag, in case this is a
* resend but cl_checksum is no longer set. b=11238 */
!sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
body->oa.o_flags = 0;
- body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
+ body->oa.o_flags |= obd_cksum_type_pack(obd_name,
+ cli->cl_cksum_type);
body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
- }
- }
- ptlrpc_request_set_replen(req);
+ }
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- aa->aa_oa = oa;
- aa->aa_requested_nob = requested_nob;
- aa->aa_nio_count = niocount;
- aa->aa_page_count = page_count;
- aa->aa_resends = 0;
- aa->aa_ppga = pga;
- aa->aa_cli = cli;
+ /* Client cksum has been already copied to wire obdo in previous
+ * lustre_set_wire_obdo(), and in the case a bulk-read is being
+ * resent due to cksum error, this will allow Server to
+ * check+dump pages on its side */
+ }
+ ptlrpc_request_set_replen(req);
+
+ aa = ptlrpc_req_async_args(aa, req);
+ aa->aa_oa = oa;
+ aa->aa_requested_nob = requested_nob;
+ aa->aa_nio_count = niocount;
+ aa->aa_page_count = page_count;
+ aa->aa_resends = 0;
+ aa->aa_ppga = pga;
+ aa->aa_cli = cli;
INIT_LIST_HEAD(&aa->aa_oaps);
*reqp = req;
RETURN(rc);
}
-static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
- __u32 client_cksum, __u32 server_cksum, int nob,
- size_t page_count, struct brw_page **pga,
- cksum_type_t client_cksum_type)
-{
- __u32 new_cksum;
- char *msg;
- cksum_type_t cksum_type;
+char dbgcksum_file_name[PATH_MAX];
+
+static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
+ struct brw_page **pga, __u32 server_cksum,
+ __u32 client_cksum)
+{
+ struct file *filp;
+ int rc, i;
+ unsigned int len;
+ char *buf;
+
+ /* will only keep dump of pages on first error for the same range in
+ * file/fid, not during the resends/retries. */
+ snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
+ "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
+ (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
+ libcfs_debug_file_path_arr :
+ LIBCFS_DEBUG_FILE_PATH_DEFAULT),
+ oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
+ oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
+ oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
+ pga[0]->off,
+ pga[page_count-1]->off + pga[page_count-1]->count - 1,
+ client_cksum, server_cksum);
+ filp = filp_open(dbgcksum_file_name,
+ O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
+ if (IS_ERR(filp)) {
+ rc = PTR_ERR(filp);
+ if (rc == -EEXIST)
+ CDEBUG(D_INFO, "%s: can't open to dump pages with "
+ "checksum error: rc = %d\n", dbgcksum_file_name,
+ rc);
+ else
+ CERROR("%s: can't open to dump pages with checksum "
+ "error: rc = %d\n", dbgcksum_file_name, rc);
+ return;
+ }
+
+ for (i = 0; i < page_count; i++) {
+ len = pga[i]->count;
+ buf = kmap(pga[i]->pg);
+ while (len != 0) {
+ rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
+ if (rc < 0) {
+ CERROR("%s: wanted to write %u but got %d "
+ "error\n", dbgcksum_file_name, len, rc);
+ break;
+ }
+ len -= rc;
+ buf += rc;
+ CDEBUG(D_INFO, "%s: wrote %d bytes\n",
+ dbgcksum_file_name, rc);
+ }
+ kunmap(pga[i]->pg);
+ }
+
+ rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
+ if (rc)
+ CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
+ filp_close(filp, NULL);
+}
+
+static int
+check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
+ __u32 client_cksum, __u32 server_cksum,
+ struct osc_brw_async_args *aa)
+{
+ const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
+ enum cksum_types cksum_type;
+ obd_dif_csum_fn *fn = NULL;
+ int sector_size = 0;
+ __u32 new_cksum;
+ char *msg;
+ int rc;
if (server_cksum == client_cksum) {
CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
return 0;
}
- cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
- oa->o_flags : 0);
- new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
- cksum_type);
+ if (aa->aa_cli->cl_checksum_dump)
+ dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
+ server_cksum, client_cksum);
+
+ cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
+ oa->o_flags : 0);
+
+ switch (cksum_type) {
+ case OBD_CKSUM_T10IP512:
+ fn = obd_dif_ip_fn;
+ sector_size = 512;
+ break;
+ case OBD_CKSUM_T10IP4K:
+ fn = obd_dif_ip_fn;
+ sector_size = 4096;
+ break;
+ case OBD_CKSUM_T10CRC512:
+ fn = obd_dif_crc_fn;
+ sector_size = 512;
+ break;
+ case OBD_CKSUM_T10CRC4K:
+ fn = obd_dif_crc_fn;
+ sector_size = 4096;
+ break;
+ default:
+ break;
+ }
- if (cksum_type != client_cksum_type)
+ if (fn)
+ rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
+ aa->aa_page_count, aa->aa_ppga,
+ OST_WRITE, fn, sector_size,
+ &new_cksum);
+ else
+ rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
+ aa->aa_ppga, OST_WRITE, cksum_type,
+ &new_cksum);
+
+ if (rc < 0)
+ msg = "failed to calculate the client write checksum";
+ else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
msg = "the server did not use the checksum type specified in "
"the original request - likely a protocol problem";
else if (new_cksum == server_cksum)
msg = "changed in transit AND doesn't match the original - "
"likely false positive due to mmap IO (bug 11742)";
- LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
- " object "DOSTID" extent [%llu-%llu]\n",
- msg, libcfs_nid2str(peer->nid),
+ LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
+ DFID " object "DOSTID" extent [%llu-%llu], original "
+ "client csum %x (type %x), server csum %x (type %x),"
+ " client csum now %x\n",
+ obd_name, msg, libcfs_nid2str(peer->nid),
oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
- POSTID(&oa->o_oi), pga[0]->off,
- pga[page_count-1]->off + pga[page_count-1]->count - 1);
- CERROR("original client csum %x (type %x), server csum %x (type %x), "
- "client csum now %x\n", client_cksum, client_cksum_type,
- server_cksum, cksum_type, new_cksum);
+ POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
+ aa->aa_ppga[aa->aa_page_count - 1]->off +
+ aa->aa_ppga[aa->aa_page_count-1]->count - 1,
+ client_cksum,
+ obd_cksum_type_unpack(aa->aa_oa->o_flags),
+ server_cksum, cksum_type, new_cksum);
return 1;
}
/* Note rc enters this function as number of bytes transferred */
static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
{
- struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
- const lnet_process_id_t *peer =
- &req->rq_import->imp_connection->c_peer;
- struct client_obd *cli = aa->aa_cli;
- struct ost_body *body;
+ struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+ struct client_obd *cli = aa->aa_cli;
+ const char *obd_name = cli->cl_import->imp_obd->obd_name;
+ const struct lnet_process_id *peer =
+ &req->rq_import->imp_connection->c_peer;
+ struct ost_body *body;
u32 client_cksum = 0;
- ENTRY;
- if (rc < 0 && rc != -EDQUOT) {
- DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
- RETURN(rc);
- }
+ ENTRY;
- LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
- body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL) {
- DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
- RETURN(-EPROTO);
- }
+ if (rc < 0 && rc != -EDQUOT) {
+ DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
+ RETURN(rc);
+ }
- /* set/clear over quota flag for a uid/gid */
- if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
- body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
- unsigned int qid[LL_MAXQUOTAS] =
- {body->oa.o_uid, body->oa.o_gid};
+ LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL) {
+ DEBUG_REQ(D_INFO, req, "cannot unpack body");
+ RETURN(-EPROTO);
+ }
- CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
- body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
- body->oa.o_flags);
- osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
- }
+ /* set/clear over quota flag for a uid/gid/projid */
+ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
+ body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
+ unsigned qid[LL_MAXQUOTAS] = {
+ body->oa.o_uid, body->oa.o_gid,
+ body->oa.o_projid };
+ CDEBUG(D_QUOTA,
+ "setdq for [%u %u %u] with valid %#llx, flags %x\n",
+ body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
+ body->oa.o_valid, body->oa.o_flags);
+ osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
+ body->oa.o_flags);
+ }
- osc_update_grant(cli, body);
+ osc_update_grant(cli, body);
- if (rc < 0)
- RETURN(rc);
+ if (rc < 0)
+ RETURN(rc);
- if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
- client_cksum = aa->aa_oa->o_cksum; /* save for later */
+ if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
+ client_cksum = aa->aa_oa->o_cksum; /* save for later */
- if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
- if (rc > 0) {
- CERROR("Unexpected +ve rc %d\n", rc);
- RETURN(-EPROTO);
- }
- LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
+ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
+ if (rc > 0) {
+ CERROR("%s: unexpected positive size %d\n",
+ obd_name, rc);
+ RETURN(-EPROTO);
+ }
- if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
- RETURN(-EAGAIN);
+ if (req->rq_bulk != NULL &&
+ sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+ RETURN(-EAGAIN);
- if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
- check_write_checksum(&body->oa, peer, client_cksum,
- body->oa.o_cksum, aa->aa_requested_nob,
- aa->aa_page_count, aa->aa_ppga,
- cksum_type_unpack(aa->aa_oa->o_flags)))
- RETURN(-EAGAIN);
+ if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
+ check_write_checksum(&body->oa, peer, client_cksum,
+ body->oa.o_cksum, aa))
+ RETURN(-EAGAIN);
- rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
- aa->aa_page_count, aa->aa_ppga);
- GOTO(out, rc);
- }
+ rc = check_write_rcs(req, aa->aa_requested_nob,
+ aa->aa_nio_count, aa->aa_page_count,
+ aa->aa_ppga);
+ GOTO(out, rc);
+ }
- /* The rest of this function executes only for OST_READs */
+ /* The rest of this function executes only for OST_READs */
+
+ if (req->rq_bulk == NULL) {
+ rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
+ RCL_SERVER);
+ LASSERT(rc == req->rq_status);
+ } else {
+ /* if unwrap_bulk failed, return -EAGAIN to retry */
+ rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+ }
+ if (rc < 0)
+ GOTO(out, rc = -EAGAIN);
- /* if unwrap_bulk failed, return -EAGAIN to retry */
- rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
- if (rc < 0)
- GOTO(out, rc = -EAGAIN);
+ if (rc > aa->aa_requested_nob) {
+ CERROR("%s: unexpected size %d, requested %d\n", obd_name,
+ rc, aa->aa_requested_nob);
+ RETURN(-EPROTO);
+ }
- if (rc > aa->aa_requested_nob) {
- CERROR("Unexpected rc %d (%d requested)\n", rc,
- aa->aa_requested_nob);
- RETURN(-EPROTO);
- }
+ if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
+ CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
+ rc, req->rq_bulk->bd_nob_transferred);
+ RETURN(-EPROTO);
+ }
- if (rc != req->rq_bulk->bd_nob_transferred) {
- CERROR ("Unexpected rc %d (%d transferred)\n",
- rc, req->rq_bulk->bd_nob_transferred);
- return (-EPROTO);
- }
+ if (req->rq_bulk == NULL) {
+ /* short io */
+ int nob, pg_count, i = 0;
+ unsigned char *buf;
+
+ CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
+ pg_count = aa->aa_page_count;
+ buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
+ rc);
+ nob = rc;
+ while (nob > 0 && pg_count > 0) {
+ unsigned char *ptr;
+ int count = aa->aa_ppga[i]->count > nob ?
+ nob : aa->aa_ppga[i]->count;
+
+ CDEBUG(D_CACHE, "page %p count %d\n",
+ aa->aa_ppga[i]->pg, count);
+ ptr = kmap_atomic(aa->aa_ppga[i]->pg);
+ memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
+ count);
+ kunmap_atomic((void *) ptr);
+
+ buf += count;
+ nob -= count;
+ i++;
+ pg_count--;
+ }
+ }
if (rc < aa->aa_requested_nob)
handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
u32 server_cksum = body->oa.o_cksum;
char *via = "";
char *router = "";
- cksum_type_t cksum_type;
-
- cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
- body->oa.o_flags : 0);
- client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
- aa->aa_ppga, OST_READ,
- cksum_type);
-
- if (peer->nid != req->rq_bulk->bd_sender) {
+ enum cksum_types cksum_type;
+ u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
+ body->oa.o_flags : 0;
+
+ cksum_type = obd_cksum_type_unpack(o_flags);
+ rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
+ aa->aa_page_count, aa->aa_ppga,
+ OST_READ, &client_cksum);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (req->rq_bulk != NULL &&
+ peer->nid != req->rq_bulk->bd_sender) {
via = " via ";
router = libcfs_nid2str(req->rq_bulk->bd_sender);
}
if (server_cksum != client_cksum) {
+ struct ost_body *clbody;
+ u32 page_count = aa->aa_page_count;
+
+ clbody = req_capsule_client_get(&req->rq_pill,
+ &RMF_OST_BODY);
+ if (cli->cl_checksum_dump)
+ dump_all_bulk_pages(&clbody->oa, page_count,
+ aa->aa_ppga, server_cksum,
+ client_cksum);
+
LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
"%s%s%s inode "DFID" object "DOSTID
- " extent [%llu-%llu]\n",
- req->rq_import->imp_obd->obd_name,
+ " extent [%llu-%llu], client %x, "
+ "server %x, cksum_type %x\n",
+ obd_name,
libcfs_nid2str(peer->nid),
via, router,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_seq : (__u64)0,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_oid : 0,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_ver : 0,
+ clbody->oa.o_valid & OBD_MD_FLFID ?
+ clbody->oa.o_parent_seq : 0ULL,
+ clbody->oa.o_valid & OBD_MD_FLFID ?
+ clbody->oa.o_parent_oid : 0,
+ clbody->oa.o_valid & OBD_MD_FLFID ?
+ clbody->oa.o_parent_ver : 0,
POSTID(&body->oa.o_oi),
aa->aa_ppga[0]->off,
- aa->aa_ppga[aa->aa_page_count-1]->off +
- aa->aa_ppga[aa->aa_page_count-1]->count -
- 1);
- CERROR("client %x, server %x, cksum_type %x\n",
- client_cksum, server_cksum, cksum_type);
+ aa->aa_ppga[page_count-1]->off +
+ aa->aa_ppga[page_count-1]->count - 1,
+ client_cksum, server_cksum,
+ cksum_type);
cksum_counter = 0;
aa->aa_oa->o_cksum = client_cksum;
rc = -EAGAIN;
CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
rc = 0;
}
- } else if (unlikely(client_cksum)) {
- static int cksum_missed;
-
- cksum_missed++;
- if ((cksum_missed & (-cksum_missed)) == cksum_missed)
- CERROR("Checksum %u requested from %s but not sent\n",
- cksum_missed, libcfs_nid2str(peer->nid));
- } else {
- rc = 0;
- }
+ } else if (unlikely(client_cksum)) {
+ static int cksum_missed;
+
+ cksum_missed++;
+ if ((cksum_missed & (-cksum_missed)) == cksum_missed)
+ CERROR("%s: checksum %u requested from %s but not sent\n",
+ obd_name, cksum_missed,
+ libcfs_nid2str(peer->nid));
+ } else {
+ rc = 0;
+ }
out:
if (rc >= 0)
lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
aa->aa_oa, &body->oa);
- RETURN(rc);
+ RETURN(rc);
}
static int osc_brw_redo_request(struct ptlrpc_request *request,
struct osc_brw_async_args *aa, int rc)
{
- struct ptlrpc_request *new_req;
- struct osc_brw_async_args *new_aa;
- struct osc_async_page *oap;
- ENTRY;
+ struct ptlrpc_request *new_req;
+ struct osc_brw_async_args *new_aa;
+ struct osc_async_page *oap;
+ ENTRY;
+ /* The below message is checked in replay-ost-single.sh test_8ae*/
DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
"redo for recoverable error %d", rc);
RETURN(rc);
list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
- if (oap->oap_request != NULL) {
- LASSERTF(request == oap->oap_request,
- "request %p != oap_request %p\n",
- request, oap->oap_request);
- if (oap->oap_interrupted) {
- ptlrpc_req_finished(new_req);
- RETURN(-EINTR);
- }
- }
- }
- /* New request takes over pga and oaps from old request.
- * Note that copying a list_head doesn't work, need to move it... */
- aa->aa_resends++;
- new_req->rq_interpret_reply = request->rq_interpret_reply;
- new_req->rq_async_args = request->rq_async_args;
+ if (oap->oap_request != NULL) {
+ LASSERTF(request == oap->oap_request,
+ "request %p != oap_request %p\n",
+ request, oap->oap_request);
+ }
+ }
+ /*
+ * New request takes over pga and oaps from old request.
+ * Note that copying a list_head doesn't work, need to move it...
+ */
+ aa->aa_resends++;
+ new_req->rq_interpret_reply = request->rq_interpret_reply;
+ new_req->rq_async_args = request->rq_async_args;
new_req->rq_commit_cb = request->rq_commit_cb;
/* cap resend delay to the current request timeout, this is similar to
* what ptlrpc does (see after_reply()) */
if (aa->aa_resends > new_req->rq_timeout)
- new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
+ new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
else
- new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
+ new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
new_req->rq_generation_set = 1;
new_req->rq_import_generation = request->rq_import_generation;
- new_aa = ptlrpc_req_async_args(new_req);
+ new_aa = ptlrpc_req_async_args(new_aa, new_req);
INIT_LIST_HEAD(&new_aa->aa_oaps);
list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
}
static int brw_interpret(const struct lu_env *env,
- struct ptlrpc_request *req, void *data, int rc)
+ struct ptlrpc_request *req, void *args, int rc)
{
- struct osc_brw_async_args *aa = data;
+ struct osc_brw_async_args *aa = args;
struct osc_extent *ext;
struct osc_extent *tmp;
struct client_obd *cli = aa->aa_cli;
- ENTRY;
+ unsigned long transferred = 0;
+
+ ENTRY;
- rc = osc_brw_fini_request(req, rc);
- CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
- /* When server return -EINPROGRESS, client should always retry
- * regardless of the number of times the bulk was resent already. */
- if (osc_recoverable_error(rc)) {
+ rc = osc_brw_fini_request(req, rc);
+ CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+ /*
+ * When server returns -EINPROGRESS, client should always retry
+ * regardless of the number of times the bulk was resent already.
+ */
+ if (osc_recoverable_error(rc) && !req->rq_no_delay) {
if (req->rq_import_generation !=
req->rq_import->imp_generation) {
CDEBUG(D_HA, "%s: resend cross eviction for object: "
cl_object_attr_update(env, obj, attr, valid);
cl_object_attr_unlock(obj);
}
- OBDO_FREE(aa->aa_oa);
+ OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
+ aa->aa_oa = NULL;
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
osc_inc_unstable_pages(req);
list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
list_del_init(&ext->oe_link);
- osc_extent_finish(env, ext, 1, rc);
+ osc_extent_finish(env, ext, 1,
+ rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
}
LASSERT(list_empty(&aa->aa_exts));
LASSERT(list_empty(&aa->aa_oaps));
+ transferred = (req->rq_bulk == NULL ? /* short io */
+ aa->aa_requested_nob :
+ req->rq_bulk->bd_nob_transferred);
+
osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
- ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+ ptlrpc_lprocfs_brw(req, transferred);
spin_lock(&cli->cl_loi_list_lock);
/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
int mem_tight = 0;
int page_count = 0;
bool soft_sync = false;
- bool interrupted = false;
+ bool ndelay = false;
int i;
int grant = 0;
int rc;
- struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ __u32 layout_version = 0;
+ LIST_HEAD(rpc_list);
struct ost_body *body;
ENTRY;
LASSERT(!list_empty(ext_list));
mem_tight |= ext->oe_memalloc;
grant += ext->oe_grants;
page_count += ext->oe_nr_pages;
+ layout_version = max(layout_version, ext->oe_layout_version);
if (obj == NULL)
obj = ext->oe_obj;
}
if (pga == NULL)
GOTO(out, rc = -ENOMEM);
- OBDO_ALLOC(oa);
+ OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
if (oa == NULL)
GOTO(out, rc = -ENOMEM);
else
LASSERT(oap->oap_page_off + oap->oap_count ==
PAGE_SIZE);
- if (oap->oap_interrupted)
- interrupted = true;
}
+ if (ext->oe_ndelay)
+ ndelay = true;
}
/* first page in the list */
crattr->cra_oa = oa;
cl_req_attr_set(env, osc2cl(obj), crattr);
- if (cmd == OBD_BRW_WRITE)
+ if (cmd == OBD_BRW_WRITE) {
oa->o_grant_used = grant;
+ if (layout_version > 0) {
+ CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
+ PFID(&oa->o_oi.oi_fid), layout_version);
+
+ oa->o_layout_version = layout_version;
+ oa->o_valid |= OBD_MD_LAYOUT_VERSION;
+ }
+ }
sort_brw_pages(pga, page_count);
rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
req->rq_interpret_reply = brw_interpret;
req->rq_memalloc = mem_tight != 0;
oap->oap_request = ptlrpc_request_addref(req);
- if (interrupted && !req->rq_intr)
- ptlrpc_mark_interrupted(req);
+ if (ndelay) {
+ req->rq_no_resend = req->rq_no_delay = 1;
+ /* probably set a shorter timeout value.
+ * to handle ETIMEDOUT in brw_interpret() correctly. */
+ /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
+ }
/* Need to update the timestamps after the request is built in case
* we race with setattr (locally or in queue at OST). If OST gets
* way to do this in a single call. bug 10150 */
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
crattr->cra_oa = &body->oa;
- crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
+ crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
cl_req_attr_set(env, osc2cl(obj), crattr);
lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
+ aa = ptlrpc_req_async_args(aa, req);
INIT_LIST_HEAD(&aa->aa_oaps);
list_splice_init(&rpc_list, &aa->aa_oaps);
INIT_LIST_HEAD(&aa->aa_exts);
}
spin_unlock(&cli->cl_loi_list_lock);
- DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
+ DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
page_count, aa, cli->cl_r_in_flight,
cli->cl_w_in_flight);
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
LASSERT(req == NULL);
if (oa)
- OBDO_FREE(oa);
+ OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
if (pga)
OBD_FREE(pga, sizeof(*pga) * page_count);
/* this should happen rarely and is pretty bad, it makes the
return set;
}
-static int osc_enqueue_fini(struct ptlrpc_request *req,
- osc_enqueue_upcall_f upcall, void *cookie,
- struct lustre_handle *lockh, enum ldlm_mode mode,
- __u64 *flags, int agl, int errcode)
+int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
+ void *cookie, struct lustre_handle *lockh,
+ enum ldlm_mode mode, __u64 *flags, bool speculative,
+ int errcode)
{
bool intent = *flags & LDLM_FL_HAS_INTENT;
int rc;
ptlrpc_status_ntoh(rep->lock_policy_res1);
if (rep->lock_policy_res1)
errcode = rep->lock_policy_res1;
- if (!agl)
+ if (!speculative)
*flags |= LDLM_FL_LVB_READY;
} else if (errcode == ELDLM_OK) {
*flags |= LDLM_FL_LVB_READY;
if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
ldlm_lock_decref(lockh, mode);
- RETURN(rc);
+ RETURN(rc);
}
-static int osc_enqueue_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- struct osc_enqueue_args *aa, int rc)
+int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
+ void *args, int rc)
{
+ struct osc_enqueue_args *aa = args;
struct ldlm_lock *lock;
struct lustre_handle *lockh = &aa->oa_lockh;
enum ldlm_mode mode = aa->oa_mode;
/* Let CP AST to grant the lock first. */
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
- if (aa->oa_agl) {
+ if (aa->oa_speculative) {
LASSERT(aa->oa_lvb == NULL);
LASSERT(aa->oa_flags == NULL);
aa->oa_flags = &flags;
lockh, rc);
/* Complete osc stuff. */
rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
- aa->oa_flags, aa->oa_agl, rc);
+ aa->oa_flags, aa->oa_speculative, rc);
- OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
ldlm_lock_decref(lockh, mode);
LDLM_LOCK_PUT(lock);
RETURN(rc);
}
-struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
-
/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
* from the 2nd OSC before a lock from the 1st one. This does not deadlock with
* other synchronous requests, however keeping some locks and trying to obtain
* release locks just after they are obtained. */
int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
__u64 *flags, union ldlm_policy_data *policy,
- struct ost_lvb *lvb, int kms_valid,
- osc_enqueue_upcall_f upcall, void *cookie,
- struct ldlm_enqueue_info *einfo,
- struct ptlrpc_request_set *rqset, int async, int agl)
+ struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
+ void *cookie, struct ldlm_enqueue_info *einfo,
+ struct ptlrpc_request_set *rqset, int async,
+ bool speculative)
{
struct obd_device *obd = exp->exp_obd;
struct lustre_handle lockh = { 0 };
policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
policy->l_extent.end |= ~PAGE_MASK;
- /*
- * kms is not valid when either object is completely fresh (so that no
- * locks are cached), or object was evicted. In the latter case cached
- * lock cannot be used, because it would prime inode state with
- * potentially stale LVB.
- */
- if (!kms_valid)
- goto no_match;
-
/* Next, search for already existing extent locks that will cover us */
/* If we're trying to read, we also search for an existing PW lock. The
* VFS and page cache already protect us locally, so lots of readers/
mode = einfo->ei_mode;
if (einfo->ei_mode == LCK_PR)
mode |= LCK_PW;
- if (agl == 0)
+ /* Normal lock requests must wait for the LVB to be ready before
+ * matching a lock; speculative lock requests do not need to,
+ * because they will not actually use the lock. */
+ if (!speculative)
match_flags |= LDLM_FL_LVB_READY;
if (intent != 0)
match_flags |= LDLM_FL_BLOCK_GRANTED;
RETURN(ELDLM_OK);
matched = ldlm_handle2lock(&lockh);
- if (agl) {
- /* AGL enqueues DLM locks speculatively. Therefore if
- * it already exists a DLM lock, it wll just inform the
- * caller to cancel the AGL process for this stripe. */
+ if (speculative) {
+ /* This DLM lock request is speculative, and does not
+ * have an associated IO request. Therefore if there
+ * is already a DLM lock, it wll just inform the
+ * caller to cancel the request for this stripe.*/
+ lock_res_and_lock(matched);
+ if (ldlm_extent_equal(&policy->l_extent,
+ &matched->l_policy_data.l_extent))
+ rc = -EEXIST;
+ else
+ rc = -ECANCELED;
+ unlock_res_and_lock(matched);
+
ldlm_lock_decref(&lockh, mode);
LDLM_LOCK_PUT(matched);
- RETURN(-ECANCELED);
+ RETURN(rc);
} else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
*flags |= LDLM_FL_LVB_READY;
}
}
-no_match:
if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
RETURN(-ENOLCK);
if (async) {
if (!rc) {
struct osc_enqueue_args *aa;
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- aa->oa_exp = exp;
- aa->oa_mode = einfo->ei_mode;
- aa->oa_type = einfo->ei_type;
+ aa = ptlrpc_req_async_args(aa, req);
+ aa->oa_exp = exp;
+ aa->oa_mode = einfo->ei_mode;
+ aa->oa_type = einfo->ei_type;
lustre_handle_copy(&aa->oa_lockh, &lockh);
- aa->oa_upcall = upcall;
- aa->oa_cookie = cookie;
- aa->oa_agl = !!agl;
- if (!agl) {
+ aa->oa_upcall = upcall;
+ aa->oa_cookie = cookie;
+ aa->oa_speculative = speculative;
+ if (!speculative) {
aa->oa_flags = flags;
aa->oa_lvb = lvb;
} else {
- /* AGL is essentially to enqueue an DLM lock
- * in advance, so we don't care about the
- * result of AGL enqueue. */
+ /* speculative locks are essentially to enqueue
+ * a DLM lock in advance, so we don't care
+ * about the result of the enqueue. */
aa->oa_lvb = NULL;
aa->oa_flags = NULL;
}
- req->rq_interpret_reply =
- (ptlrpc_interpterer_t)osc_enqueue_interpret;
- if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req);
- else
- ptlrpc_set_add_req(rqset, req);
+ req->rq_interpret_reply = osc_enqueue_interpret;
+ ptlrpc_set_add_req(rqset, req);
} else if (intent) {
ptlrpc_req_finished(req);
}
}
rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
- flags, agl, rc);
+ flags, speculative, rc);
if (intent)
ptlrpc_req_finished(req);
RETURN(rc);
}
-int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
- enum ldlm_type type, union ldlm_policy_data *policy,
- enum ldlm_mode mode, __u64 *flags, void *data,
+int osc_match_base(const struct lu_env *env, struct obd_export *exp,
+ struct ldlm_res_id *res_id, enum ldlm_type type,
+ union ldlm_policy_data *policy, enum ldlm_mode mode,
+ __u64 *flags, struct osc_object *obj,
struct lustre_handle *lockh, int unref)
{
struct obd_device *obd = exp->exp_obd;
if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
RETURN(rc);
- if (data != NULL) {
+ if (obj != NULL) {
struct ldlm_lock *lock = ldlm_handle2lock(lockh);
LASSERT(lock != NULL);
- if (!osc_set_lock_data(lock, data)) {
+ if (osc_set_lock_data(lock, obj)) {
+ lock_res_and_lock(lock);
+ if (!ldlm_is_lvb_cached(lock)) {
+ LASSERT(lock->l_ast_data == obj);
+ osc_lock_lvb_update(env, obj, lock, NULL);
+ ldlm_set_lvb_cached(lock);
+ }
+ unlock_res_and_lock(lock);
+ } else {
ldlm_lock_decref(lockh, rc);
rc = 0;
}
}
static int osc_statfs_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- struct osc_async_args *aa, int rc)
+ struct ptlrpc_request *req, void *args, int rc)
{
- struct obd_statfs *msfs;
- ENTRY;
+ struct osc_async_args *aa = args;
+ struct obd_statfs *msfs;
- if (rc == -EBADR)
- /* The request has in fact never been sent
- * due to issues at a higher level (LOV).
- * Exit immediately since the caller is
- * aware of the problem and takes care
- * of the clean up */
- RETURN(rc);
+ ENTRY;
+ if (rc == -EBADR)
+ /*
+ * The request has in fact never been sent due to issues at
+ * a higher level (LOV). Exit immediately since the caller
+ * is aware of the problem and takes care of the clean up.
+ */
+ RETURN(rc);
- if ((rc == -ENOTCONN || rc == -EAGAIN) &&
- (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
- GOTO(out, rc = 0);
+ if ((rc == -ENOTCONN || rc == -EAGAIN) &&
+ (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
+ GOTO(out, rc = 0);
- if (rc != 0)
- GOTO(out, rc);
+ if (rc != 0)
+ GOTO(out, rc);
- msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
- if (msfs == NULL) {
+ msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
+ if (msfs == NULL)
GOTO(out, rc = -EPROTO);
- }
- *aa->aa_oi->oi_osfs = *msfs;
+ *aa->aa_oi->oi_osfs = *msfs;
out:
- rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
- RETURN(rc);
+ rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+
+ RETURN(rc);
}
static int osc_statfs_async(struct obd_export *exp,
- struct obd_info *oinfo, __u64 max_age,
+ struct obd_info *oinfo, time64_t max_age,
struct ptlrpc_request_set *rqset)
{
struct obd_device *obd = class_exp2obd(exp);
struct ptlrpc_request *req;
struct osc_async_args *aa;
- int rc;
+ int rc;
ENTRY;
+ if (obd->obd_osfs_age >= max_age) {
+ CDEBUG(D_SUPER,
+ "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
+ obd->obd_name, &obd->obd_osfs,
+ obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
+ obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
+ spin_lock(&obd->obd_osfs_lock);
+ memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
+ spin_unlock(&obd->obd_osfs_lock);
+ oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
+ if (oinfo->oi_cb_up)
+ oinfo->oi_cb_up(oinfo, 0);
+
+ RETURN(0);
+ }
+
/* We could possibly pass max_age in the request (as an absolute
* timestamp or a "seconds.usec ago") so the target can avoid doing
* extra calls into the filesystem if that isn't necessary (e.g.
ptlrpc_request_free(req);
RETURN(rc);
}
- ptlrpc_request_set_replen(req);
- req->rq_request_portal = OST_CREATE_PORTAL;
- ptlrpc_at_set_req_timeout(req);
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OST_CREATE_PORTAL;
+ ptlrpc_at_set_req_timeout(req);
- if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
- /* procfs requests not want stat in wait for avoid deadlock */
- req->rq_no_resend = 1;
- req->rq_no_delay = 1;
- }
+ if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
+ /* procfs requests not want stat in wait for avoid deadlock */
+ req->rq_no_resend = 1;
+ req->rq_no_delay = 1;
+ }
- req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
- CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- aa->aa_oi = oinfo;
+ req->rq_interpret_reply = osc_statfs_interpret;
+ aa = ptlrpc_req_async_args(aa, req);
+ aa->aa_oi = oinfo;
- ptlrpc_set_add_req(rqset, req);
- RETURN(0);
+ ptlrpc_set_add_req(rqset, req);
+ RETURN(0);
}
static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
- struct obd_statfs *osfs, __u64 max_age, __u32 flags)
+ struct obd_statfs *osfs, time64_t max_age, __u32 flags)
{
- struct obd_device *obd = class_exp2obd(exp);
- struct obd_statfs *msfs;
- struct ptlrpc_request *req;
- struct obd_import *imp = NULL;
- int rc;
- ENTRY;
+ struct obd_device *obd = class_exp2obd(exp);
+ struct obd_statfs *msfs;
+ struct ptlrpc_request *req;
+ struct obd_import *imp = NULL;
+ int rc;
+ ENTRY;
+
/*Since the request might also come from lprocfs, so we need
*sync this with client_disconnect_export Bug15684*/
if (!imp)
RETURN(-ENODEV);
- /* We could possibly pass max_age in the request (as an absolute
- * timestamp or a "seconds.usec ago") so the target can avoid doing
- * extra calls into the filesystem if that isn't necessary (e.g.
- * during mount that would help a bit). Having relative timestamps
- * is not so great if request processing is slow, while absolute
- * timestamps are not ideal because they need time synchronization. */
- req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
+ /* We could possibly pass max_age in the request (as an absolute
+ * timestamp or a "seconds.usec ago") so the target can avoid doing
+ * extra calls into the filesystem if that isn't necessary (e.g.
+ * during mount that would help a bit). Having relative timestamps
+ * is not so great if request processing is slow, while absolute
+ * timestamps are not ideal because they need time synchronization. */
+ req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
- class_import_put(imp);
+ class_import_put(imp);
- if (req == NULL)
- RETURN(-ENOMEM);
+ if (req == NULL)
+ RETURN(-ENOMEM);
- rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
- ptlrpc_request_set_replen(req);
- req->rq_request_portal = OST_CREATE_PORTAL;
- ptlrpc_at_set_req_timeout(req);
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OST_CREATE_PORTAL;
+ ptlrpc_at_set_req_timeout(req);
- if (flags & OBD_STATFS_NODELAY) {
- /* procfs requests not want stat in wait for avoid deadlock */
- req->rq_no_resend = 1;
- req->rq_no_delay = 1;
- }
+ if (flags & OBD_STATFS_NODELAY) {
+ /* procfs requests not want stat in wait for avoid deadlock */
+ req->rq_no_resend = 1;
+ req->rq_no_delay = 1;
+ }
- rc = ptlrpc_queue_wait(req);
- if (rc)
- GOTO(out, rc);
+ rc = ptlrpc_queue_wait(req);
+ if (rc)
+ GOTO(out, rc);
- msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
- if (msfs == NULL) {
- GOTO(out, rc = -EPROTO);
- }
+ msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
+ if (msfs == NULL)
+ GOTO(out, rc = -EPROTO);
- *osfs = *msfs;
+ *osfs = *msfs;
- EXIT;
- out:
- ptlrpc_req_finished(req);
- return rc;
+ EXIT;
+out:
+ ptlrpc_req_finished(req);
+ return rc;
}
static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
void *karg, void __user *uarg)
{
- struct obd_device *obd = exp->exp_obd;
- struct obd_ioctl_data *data = karg;
- int err = 0;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct obd_ioctl_data *data = karg;
+ int rc = 0;
+ ENTRY;
if (!try_module_get(THIS_MODULE)) {
CERROR("%s: cannot get module '%s'\n", obd->obd_name,
module_name(THIS_MODULE));
return -EINVAL;
}
- switch (cmd) {
- case OBD_IOC_CLIENT_RECOVER:
- err = ptlrpc_recover_import(obd->u.cli.cl_import,
- data->ioc_inlbuf1, 0);
- if (err > 0)
- err = 0;
- GOTO(out, err);
- case IOC_OSC_SET_ACTIVE:
- err = ptlrpc_set_import_active(obd->u.cli.cl_import,
- data->ioc_offset);
- GOTO(out, err);
- case OBD_IOC_PING_TARGET:
- err = ptlrpc_obd_ping(obd);
- GOTO(out, err);
+ switch (cmd) {
+ case OBD_IOC_CLIENT_RECOVER:
+ rc = ptlrpc_recover_import(obd->u.cli.cl_import,
+ data->ioc_inlbuf1, 0);
+ if (rc > 0)
+ rc = 0;
+ break;
+ case IOC_OSC_SET_ACTIVE:
+ rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
+ data->ioc_offset);
+ break;
default:
- CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
- cmd, current_comm());
- GOTO(out, err = -ENOTTY);
+ rc = -ENOTTY;
+ CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
+ obd->obd_name, cmd, current_comm(), rc);
+ break;
}
-out:
+
module_put(THIS_MODULE);
- return err;
+ return rc;
}
-static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
- u32 keylen, void *key,
- u32 vallen, void *val,
- struct ptlrpc_request_set *set)
+int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
+ u32 keylen, void *key, u32 vallen, void *val,
+ struct ptlrpc_request_set *set)
{
struct ptlrpc_request *req;
struct obd_device *obd = exp->exp_obd;
RETURN(0);
}
- if (KEY_IS(KEY_CACHE_SET)) {
- struct client_obd *cli = &obd->u.cli;
-
- LASSERT(cli->cl_cache == NULL); /* only once */
- cli->cl_cache = (struct cl_client_cache *)val;
- cl_cache_incref(cli->cl_cache);
- cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
-
- /* add this osc into entity list */
- LASSERT(list_empty(&cli->cl_lru_osc));
- spin_lock(&cli->cl_cache->ccc_lru_lock);
- list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
- spin_unlock(&cli->cl_cache->ccc_lru_lock);
-
- RETURN(0);
- }
-
if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
struct client_obd *cli = &obd->u.cli;
long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
&RMF_OST_BODY :
&RMF_SETINFO_VAL);
- memcpy(tmp, val, vallen);
+ memcpy(tmp, val, vallen);
if (KEY_IS(KEY_GRANT_SHRINK)) {
- struct osc_grant_args *aa;
- struct obdo *oa;
-
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- OBDO_ALLOC(oa);
- if (!oa) {
- ptlrpc_req_finished(req);
- RETURN(-ENOMEM);
- }
- *oa = ((struct ost_body *)val)->oa;
- aa->aa_oa = oa;
- req->rq_interpret_reply = osc_shrink_grant_interpret;
- }
+ struct osc_grant_args *aa;
+ struct obdo *oa;
+
+ aa = ptlrpc_req_async_args(aa, req);
+ OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
+ if (!oa) {
+ ptlrpc_req_finished(req);
+ RETURN(-ENOMEM);
+ }
+ *oa = ((struct ost_body *)val)->oa;
+ aa->aa_oa = oa;
+ req->rq_interpret_reply = osc_shrink_grant_interpret;
+ }
ptlrpc_request_set_replen(req);
if (!KEY_IS(KEY_GRANT_SHRINK)) {
RETURN(0);
}
+EXPORT_SYMBOL(osc_set_info_async);
-static int osc_reconnect(const struct lu_env *env,
- struct obd_export *exp, struct obd_device *obd,
- struct obd_uuid *cluuid,
- struct obd_connect_data *data,
- void *localdata)
+int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
+ struct obd_device *obd, struct obd_uuid *cluuid,
+ struct obd_connect_data *data, void *localdata)
{
- struct client_obd *cli = &obd->u.cli;
+ struct client_obd *cli = &obd->u.cli;
- if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
- long lost_grant;
+ if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
+ long lost_grant;
long grant;
spin_lock(&cli->cl_loi_list_lock);
grant = cli->cl_avail_grant + cli->cl_reserved_grant;
- if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
+ if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
+ /* restore ocd_grant_blkbits as client page bits */
+ data->ocd_grant_blkbits = PAGE_SHIFT;
grant += cli->cl_dirty_grant;
- else
+ } else {
grant += cli->cl_dirty_pages << PAGE_SHIFT;
+ }
data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
lost_grant = cli->cl_lost_grant;
cli->cl_lost_grant = 0;
RETURN(0);
}
+EXPORT_SYMBOL(osc_reconnect);
-static int osc_disconnect(struct obd_export *exp)
+int osc_disconnect(struct obd_export *exp)
{
struct obd_device *obd = class_exp2obd(exp);
int rc;
- rc = client_disconnect_export(exp);
- /**
- * Initially we put del_shrink_grant before disconnect_export, but it
- * causes the following problem if setup (connect) and cleanup
- * (disconnect) are tangled together.
- * connect p1 disconnect p2
- * ptlrpc_connect_import
- * ............... class_manual_cleanup
- * osc_disconnect
- * del_shrink_grant
- * ptlrpc_connect_interrupt
- * init_grant_shrink
- * add this client to shrink list
- * cleanup_osc
- * Bang! pinger trigger the shrink.
- * So the osc should be disconnected from the shrink list, after we
- * are sure the import has been destroyed. BUG18662
- */
- if (obd->u.cli.cl_import == NULL)
- osc_del_shrink_grant(&obd->u.cli);
- return rc;
-}
-
-static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
- struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
+ rc = client_disconnect_export(exp);
+ /**
+ * Initially we put del_shrink_grant before disconnect_export, but it
+ * causes the following problem if setup (connect) and cleanup
+ * (disconnect) are tangled together.
+ * connect p1 disconnect p2
+ * ptlrpc_connect_import
+ * ............... class_manual_cleanup
+ * osc_disconnect
+ * del_shrink_grant
+ * ptlrpc_connect_interrupt
+ * osc_init_grant
+ * add this client to shrink list
+ * cleanup_osc
+ * Bang! grant shrink thread trigger the shrink. BUG18662
+ */
+ osc_del_grant_list(&obd->u.cli);
+ return rc;
+}
+EXPORT_SYMBOL(osc_disconnect);
+
+int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *arg)
{
struct lu_env *env = arg;
struct ldlm_resource *res = cfs_hash_object(hs, hnode);
RETURN(0);
}
+EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
static int osc_import_event(struct obd_device *obd,
struct obd_import *imp,
break;
}
case IMP_EVENT_INACTIVE: {
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
break;
}
case IMP_EVENT_INVALIDATE: {
break;
}
case IMP_EVENT_ACTIVE: {
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
break;
}
case IMP_EVENT_OCD: {
if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
break;
}
case IMP_EVENT_DEACTIVATE: {
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
break;
}
case IMP_EVENT_ACTIVATE: {
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
break;
}
default:
* Cancel all unused and granted extent lock.
*/
if (lock->l_resource->lr_type == LDLM_EXTENT &&
- lock->l_granted_mode == lock->l_req_mode &&
+ ldlm_is_granted(lock) &&
osc_ldlm_weigh_ast(lock) == 0)
RETURN(1);
RETURN(0);
}
-int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
{
struct client_obd *cli = &obd->u.cli;
- struct obd_type *type;
- void *handler;
- int rc;
- int adding;
- int added;
- int req_count;
+ void *handler;
+ int rc;
+
ENTRY;
rc = ptlrpcd_addref();
if (rc)
GOTO(out_ptlrpcd, rc);
+
handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
if (IS_ERR(handler))
- GOTO(out_client_setup, rc = PTR_ERR(handler));
+ GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
cli->cl_writeback_work = handler;
handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
GOTO(out_ptlrpcd_work, rc);
cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
+ osc_update_next_shrink(cli);
-#ifdef CONFIG_PROC_FS
- obd->obd_vars = lprocfs_osc_obd_vars;
-#endif
- /* If this is true then both client (osc) and server (osp) are on the
- * same node. The osp layer if loaded first will register the osc proc
- * directory. In that case this obd_device will be attached its proc
- * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
- type = class_search_type(LUSTRE_OSP_NAME);
- if (type && type->typ_procsym) {
- obd->obd_proc_entry = lprocfs_register(obd->obd_name,
- type->typ_procsym,
- obd->obd_vars, obd);
- if (IS_ERR(obd->obd_proc_entry)) {
- rc = PTR_ERR(obd->obd_proc_entry);
- CERROR("error %d setting up lprocfs for %s\n", rc,
- obd->obd_name);
- obd->obd_proc_entry = NULL;
- }
- } else {
- rc = lprocfs_obd_setup(obd);
- }
+ RETURN(rc);
- /* If the basic OSC proc tree construction succeeded then
- * lets do the rest. */
- if (rc == 0) {
- lproc_osc_attach_seqstat(obd);
- sptlrpc_lprocfs_cliobd_attach(obd);
- ptlrpc_lprocfs_register_obd(obd);
+out_ptlrpcd_work:
+ if (cli->cl_writeback_work != NULL) {
+ ptlrpcd_destroy_work(cli->cl_writeback_work);
+ cli->cl_writeback_work = NULL;
+ }
+ if (cli->cl_lru_work != NULL) {
+ ptlrpcd_destroy_work(cli->cl_lru_work);
+ cli->cl_lru_work = NULL;
}
+ client_obd_cleanup(obd);
+out_ptlrpcd:
+ ptlrpcd_decref();
+ RETURN(rc);
+}
+EXPORT_SYMBOL(osc_setup_common);
+
+int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+{
+ struct client_obd *cli = &obd->u.cli;
+ int adding;
+ int added;
+ int req_count;
+ int rc;
+
+ ENTRY;
+
+ rc = osc_setup_common(obd, lcfg);
+ if (rc < 0)
+ RETURN(rc);
+
+ rc = osc_tunables_init(obd);
+ if (rc)
+ RETURN(rc);
/*
* We try to control the total number of requests with a upper limit
atomic_add(added, &osc_pool_req_count);
}
- INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
spin_lock(&osc_shrink_lock);
list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
spin_unlock(&osc_shrink_lock);
+ cli->cl_import->imp_idle_timeout = osc_idle_timeout;
+ cli->cl_import->imp_idle_debug = D_HA;
RETURN(0);
-
-out_ptlrpcd_work:
- if (cli->cl_writeback_work != NULL) {
- ptlrpcd_destroy_work(cli->cl_writeback_work);
- cli->cl_writeback_work = NULL;
- }
- if (cli->cl_lru_work != NULL) {
- ptlrpcd_destroy_work(cli->cl_lru_work);
- cli->cl_lru_work = NULL;
- }
-out_client_setup:
- client_obd_cleanup(obd);
-out_ptlrpcd:
- ptlrpcd_decref();
- RETURN(rc);
}
-static int osc_precleanup(struct obd_device *obd)
+int osc_precleanup_common(struct obd_device *obd)
{
struct client_obd *cli = &obd->u.cli;
ENTRY;
}
obd_cleanup_client_import(obd);
+ RETURN(0);
+}
+EXPORT_SYMBOL(osc_precleanup_common);
+
+static int osc_precleanup(struct obd_device *obd)
+{
+ ENTRY;
+
+ osc_precleanup_common(obd);
+
ptlrpc_lprocfs_unregister_obd(obd);
- lprocfs_obd_cleanup(obd);
RETURN(0);
}
-int osc_cleanup(struct obd_device *obd)
+int osc_cleanup_common(struct obd_device *obd)
{
struct client_obd *cli = &obd->u.cli;
int rc;
ptlrpcd_decref();
RETURN(rc);
}
+EXPORT_SYMBOL(osc_cleanup_common);
-int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
-{
- int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
- return rc > 0 ? 0: rc;
-}
-
-static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
-{
- return osc_process_config_base(obd, buf);
-}
-
-static struct obd_ops osc_obd_ops = {
+static const struct obd_ops osc_obd_ops = {
.o_owner = THIS_MODULE,
.o_setup = osc_setup,
.o_precleanup = osc_precleanup,
- .o_cleanup = osc_cleanup,
+ .o_cleanup = osc_cleanup_common,
.o_add_conn = client_import_add_conn,
.o_del_conn = client_import_del_conn,
- .o_connect = client_connect_import,
+ .o_connect = client_connect_import,
.o_reconnect = osc_reconnect,
.o_disconnect = osc_disconnect,
.o_statfs = osc_statfs,
.o_iocontrol = osc_iocontrol,
.o_set_info_async = osc_set_info_async,
.o_import_event = osc_import_event,
- .o_process_config = osc_process_config,
.o_quotactl = osc_quotactl,
};
static struct shrinker *osc_cache_shrinker;
-struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
+LIST_HEAD(osc_shrink_list);
DEFINE_SPINLOCK(osc_shrink_lock);
#ifndef HAVE_SHRINKER_COUNT
.nr_to_scan = shrink_param(sc, nr_to_scan),
.gfp_mask = shrink_param(sc, gfp_mask)
};
-#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
- struct shrinker *shrinker = NULL;
-#endif
-
(void)osc_cache_shrink_scan(shrinker, &scv);
return osc_cache_shrink_count(shrinker, &scv);
static int __init osc_init(void)
{
- bool enable_proc = true;
- struct obd_type *type;
unsigned int reqpool_size;
unsigned int reqsize;
int rc;
if (rc)
RETURN(rc);
- type = class_search_type(LUSTRE_OSP_NAME);
- if (type != NULL && type->typ_procsym != NULL)
- enable_proc = false;
-
- rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
+ rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
LUSTRE_OSC_NAME, &osc_device_type);
if (rc)
GOTO(out_kmem, rc);
osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
ptlrpc_add_rqs_to_pool);
- if (osc_rq_pool != NULL)
- GOTO(out, rc);
- rc = -ENOMEM;
+ if (osc_rq_pool == NULL)
+ GOTO(out_type, rc = -ENOMEM);
+
+ rc = osc_start_grant_work();
+ if (rc != 0)
+ GOTO(out_req_pool, rc);
+
+ RETURN(rc);
+
+out_req_pool:
+ ptlrpc_free_rq_pool(osc_rq_pool);
out_type:
class_unregister_type(LUSTRE_OSC_NAME);
out_kmem:
lu_kmem_fini(osc_caches);
-out:
+
RETURN(rc);
}
static void __exit osc_exit(void)
{
+ osc_stop_grant_work();
remove_shrinker(osc_cache_shrinker);
class_unregister_type(LUSTRE_OSC_NAME);
lu_kmem_fini(osc_caches);