/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2001-2003 Cluster File Systems, Inc.
- * Author Peter Braam <braam@clusterfs.com>
+ * GPL HEADER START
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * You may have signed or agreed to another license before downloading
- * this software. If so, you are bound by the terms and conditions
- * of that agreement, and the following does not apply to you. See the
- * LICENSE file included with this distribution for more information.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * If you did not agree to a different license, then this copy of Lustre
- * is open source software; you can redistribute it and/or modify it
- * under the terms of version 2 of the GNU General Public License as
- * published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * In either case, Lustre is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * license text for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
- * For testing and management it is treated as an obd_device,
- * although * it does not export a full OBD method table (the
- * requests are coming * in over the wire, so object target modules
- * do not have a full * method table.)
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
*
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
*/
#ifndef EXPORT_SYMTAB
extern quota_interface_t osc_quota_interface;
/* by default 10s */
-atomic_t osc_resend_time;
+atomic_t osc_resend_time;
/* Pack OSC object metadata for disk storage (LE byte order). */
static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
{
struct ptlrpc_request *req;
struct ost_body *body;
- int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
struct osc_async_args *aa;
ENTRY;
req->rq_interpret_reply = osc_getattr_interpret;
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = (struct osc_async_args *)&req->rq_async_args;
+ aa = ptlrpc_req_async_args(req);
aa->aa_oi = oinfo;
ptlrpc_set_add_req(set, req);
{
struct ptlrpc_request *req;
struct ost_body *body;
- int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ int rc;
ENTRY;
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
{
struct ptlrpc_request *req;
struct ost_body *body;
- int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ int rc;
ENTRY;
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
{
struct ptlrpc_request *req;
struct ost_body *body;
- int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
+ int bufcount = 2;
struct osc_async_args *aa;
ENTRY;
+ if (osc_exp_is_2_0_server(exp)) {
+ bufcount = 3;
+ }
+
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
- OST_SETATTR, 2, size, NULL);
+ OST_SETATTR, bufcount, size, NULL);
if (!req)
RETURN(-ENOMEM);
req->rq_interpret_reply = osc_setattr_interpret;
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = (struct osc_async_args *)&req->rq_async_args;
+ aa = ptlrpc_req_async_args(req);
aa->aa_oi = oinfo;
ptlrpc_set_add_req(rqset, req);
struct ptlrpc_request *req;
struct ost_body *body;
struct lov_stripe_md *lsm;
- int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ int rc;
ENTRY;
LASSERT(oa);
struct ptlrpc_request *req;
struct osc_async_args *aa;
struct ost_body *body;
- int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
ENTRY;
if (!oinfo->oi_oa) {
req->rq_interpret_reply = osc_punch_interpret;
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = (struct osc_async_args *)&req->rq_async_args;
+ aa = ptlrpc_req_async_args(req);
aa->aa_oi = oinfo;
ptlrpc_set_add_req(rqset, req);
RETURN(0);
}
-static int osc_sync(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, obd_size start, obd_size end)
+static int osc_sync_interpret(struct ptlrpc_request *req,
+ struct osc_async_args *aa, int rc)
+{
+ struct ost_body *body;
+ ENTRY;
+
+ if (rc)
+ GOTO(out, rc);
+
+ body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
+ lustre_swab_ost_body);
+ if (body == NULL) {
+ CERROR ("can't unpack ost_body\n");
+ GOTO(out, rc = -EPROTO);
+ }
+
+ *aa->aa_oi->oi_oa = body->oa;
+out:
+ rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+ RETURN(rc);
+}
+
+static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
+ obd_size start, obd_size end,
+ struct ptlrpc_request_set *set)
{
struct ptlrpc_request *req;
struct ost_body *body;
- int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ struct osc_async_args *aa;
ENTRY;
- if (!oa) {
+ if (!oinfo->oi_oa) {
CERROR("oa NULL\n");
RETURN(-EINVAL);
}
RETURN(-ENOMEM);
body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
- memcpy(&body->oa, oa, sizeof(*oa));
+ memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
/* overload the size and blocks fields in the oa with start/end */
body->oa.o_size = start;
body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
ptlrpc_req_set_repsize(req, 2, size);
+ req->rq_interpret_reply = osc_sync_interpret;
- rc = ptlrpc_queue_wait(req);
- if (rc)
- GOTO(out, rc);
-
- body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR ("can't unpack ost_body\n");
- GOTO (out, rc = -EPROTO);
- }
-
- memcpy(oa, &body->oa, sizeof(*oa));
+ CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+ aa = ptlrpc_req_async_args(req);
+ aa->aa_oi = oinfo;
- EXIT;
- out:
- ptlrpc_req_finished(req);
- return rc;
+ ptlrpc_set_add_req(set, req);
+ RETURN (0);
}
/* Find and cancel locally locks matched by @mode in the resource found by
* @objid. Found locks are added into @cancel list. Returns the amount of
* locks added to @cancels list. */
-static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
+static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
struct list_head *cancels, ldlm_mode_t mode,
int lock_flags)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
- struct ldlm_res_id res_id = { .name = { objid } };
- struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
+ struct ldlm_res_id res_id;
+ struct ldlm_resource *res;
int count;
ENTRY;
+ osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
+ res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
if (res == NULL)
RETURN(0);
CFS_LIST_HEAD(cancels);
struct ptlrpc_request *req;
struct ost_body *body;
- int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
+ __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
sizeof(struct ldlm_request) };
int count, bufcount = 2;
struct client_obd *cli = &exp->exp_obd->u.cli;
LASSERT(oa->o_id != 0);
- count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
+ count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
LDLM_FL_DISCARD_DATA);
if (exp_connect_cancelset(exp))
bufcount = 3;
if (pga[i]->count > nob_read) {
/* EOF inside this page */
- ptr = cfs_kmap(pga[i]->pg) +
+ ptr = cfs_kmap(pga[i]->pg) +
(pga[i]->off & ~CFS_PAGE_MASK);
memset(ptr + nob_read, 0, pga[i]->count - nob_read);
cfs_kunmap(pga[i]->pg);
if (req->rq_bulk->bd_nob_transferred != requested_nob) {
CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
- requested_nob, req->rq_bulk->bd_nob_transferred);
+ req->rq_bulk->bd_nob_transferred, requested_nob);
return(-EPROTO);
}
struct ost_body *body;
struct obd_ioobj *ioobj;
struct niobuf_remote *niobuf;
- int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
int niocount, i, requested_nob, opc, rc;
struct ptlrpc_request_pool *pool;
struct osc_brw_async_args *aa;
LASSERTF((void *)(niobuf - niocount) ==
lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
niocount * sizeof(*niobuf)),
- "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
- REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
+ "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
+ REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
(void *)(niobuf - niocount));
osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
}
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = (struct osc_brw_async_args *)&req->rq_async_args;
+ aa = ptlrpc_req_async_args(req);
aa->aa_oa = oa;
aa->aa_requested_nob = requested_nob;
aa->aa_nio_count = niocount;
"["LPU64"-"LPU64"]\n",
msg, libcfs_nid2str(peer->nid),
oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
- oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
+ oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
(__u64)0,
oa->o_id,
oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
/* Note rc enters this function as number of bytes transferred */
static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
{
- struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+ struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
const lnet_process_id_t *peer =
&req->rq_import->imp_connection->c_peer;
struct client_obd *cli = aa->aa_cli;
if (server_cksum == ~0 && rc > 0) {
CERROR("Protocol error: server %s set the 'checksum' "
"bit, but didn't send a checksum. Not fatal, "
- "but please tell CFS.\n",
+ "but please notify on http://bugzilla.lustre.org/\n",
libcfs_nid2str(peer->nid));
} else if (server_cksum != client_cksum) {
LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
new_req->rq_async_args = request->rq_async_args;
new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
- new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
+ new_aa = ptlrpc_req_async_args(new_req);
CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
}
}
- /* use ptlrpc_set_add_req is safe because interpret functions work
- * in check_set context. only one way exist with access to request
- * from different thread got -EINTR - this way protected with
+ /* use ptlrpc_set_add_req is safe because interpret functions work
+ * in check_set context. only one way exist with access to request
+ * from different thread got -EINTR - this way protected with
* cl_loi_list_lock */
ptlrpc_set_add_req(set, new_req);
rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
page_count, pga, &request);
- aa = (struct osc_brw_async_args *)&request->rq_async_args;
+ CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
+ aa = ptlrpc_req_async_args(request);
if (cmd == OBD_BRW_READ) {
lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
- ptlrpc_lprocfs_brw(request, OST_READ, aa->aa_requested_nob);
} else {
lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
lprocfs_oh_tally(&cli->cl_write_rpc_hist,
cli->cl_w_in_flight);
- ptlrpc_lprocfs_brw(request, OST_WRITE, aa->aa_requested_nob);
}
+ ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
+
LASSERT(list_empty(&aa->aa_oaps));
if (rc == 0) {
RETURN(0);
}
+static int lop_makes_hprpc(struct loi_oap_pages *lop)
+{
+ struct osc_async_page *oap;
+ ENTRY;
+
+ if (list_empty(&lop->lop_urgent))
+ RETURN(0);
+
+ oap = list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+
+ if (oap->oap_async_flags & ASYNC_HP) {
+ CDEBUG(D_CACHE, "hp request forcing RPC\n");
+ RETURN(1);
+ }
+
+ RETURN(0);
+}
+
static void on_list(struct list_head *item, struct list_head *list,
int should_be_on)
{
* can find pages to build into rpcs quickly */
static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
{
- on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
- lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
- lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ if (lop_makes_hprpc(&loi->loi_write_lop) ||
+ lop_makes_hprpc(&loi->loi_read_lop)) {
+ /* HP rpc */
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
+ on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+ } else {
+ on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+ lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+ lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ }
on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
loi->loi_write_lop.lop_num_pending);
else
lop = &oap->oap_loi->loi_read_lop;
- if (oap->oap_async_flags & ASYNC_URGENT)
+ if (oap->oap_async_flags & ASYNC_HP)
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+ else if (oap->oap_async_flags & ASYNC_URGENT)
+ list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
}
void *caller_data = NULL;
struct osc_async_page *oap;
struct ldlm_lock *lock = NULL;
+ obd_valid valid;
int i, rc;
ENTRY;
CERROR("prep_req failed: %d\n", rc);
GOTO(out, req = ERR_PTR(rc));
}
+ oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
+ sizeof(struct ost_body)))->oa;
/* Need to update the timestamps after the request is built in case
* we race with setattr (locally or in queue at OST). If OST gets
* later setattr before earlier BRW (as determined by the request xid),
* the OST will not use BRW timestamps. Sadly, there is no obvious
* way to do this in a single call. bug 10150 */
- ops->ap_update_obdo(caller_data, cmd, oa,
- OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
+ if (pga[0]->flag & OBD_BRW_SRVLOCK) {
+ /* in case of lockless read/write do not use inode's
+ * timestamps because concurrent stat might fill the
+ * inode with out-of-date times, send current
+ * instead */
+ if (cmd & OBD_BRW_WRITE) {
+ oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
+ oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+ valid = OBD_MD_FLATIME;
+ } else {
+ oa->o_atime = LTIME_S(CURRENT_TIME);
+ oa->o_valid |= OBD_MD_FLATIME;
+ valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+ }
+ } else {
+ valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
+ }
+ ops->ap_update_obdo(caller_data, cmd, oa, valid);
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = (struct osc_brw_async_args *)&req->rq_async_args;
+ aa = ptlrpc_req_async_args(req);
CFS_INIT_LIST_HEAD(&aa->aa_oaps);
list_splice(rpc_list, &aa->aa_oaps);
CFS_INIT_LIST_HEAD(rpc_list);
int srvlock = 0;
ENTRY;
+ /* If there are HP OAPs we need to handle at least 1 of them,
+ * move it the beginning of the pending list for that. */
+ if (!list_empty(&lop->lop_urgent)) {
+ oap = list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ if (oap->oap_async_flags & ASYNC_HP)
+ list_move(&oap->oap_pending_item, &lop->lop_pending);
+ }
+
/* first we find the pages we're allowed to work with */
list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
ops = oap->oap_caller_ops;
RETURN(PTR_ERR(req));
}
- aa = (struct osc_brw_async_args *)&req->rq_async_args;
+ aa = ptlrpc_req_async_args(req);
if (cmd == OBD_BRW_READ) {
lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
(starting_offset >> CFS_PAGE_SHIFT) + 1);
- ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
} else {
lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
lprocfs_oh_tally(&cli->cl_write_rpc_hist,
cli->cl_w_in_flight);
lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
(starting_offset >> CFS_PAGE_SHIFT) + 1);
- ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
}
+ ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
client_obd_list_lock(&cli->cl_loi_list_lock);
#define LOI_DEBUG(LOI, STR, args...) \
CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
- !list_empty(&(LOI)->loi_cli_item), \
+ !list_empty(&(LOI)->loi_ready_item) || \
+ !list_empty(&(LOI)->loi_hp_ready_item), \
(LOI)->loi_write_lop.lop_num_pending, \
!list_empty(&(LOI)->loi_write_lop.lop_urgent), \
(LOI)->loi_read_lop.lop_num_pending, \
struct lov_oinfo *osc_next_loi(struct client_obd *cli)
{
ENTRY;
- /* first return all objects which we already know to have
- * pages ready to be stuffed into rpcs */
+ /* First return objects that have blocked locks so that they
+ * will be flushed quickly and other clients can get the lock,
+ * then objects which have pages ready to be stuffed into RPCs */
+ if (!list_empty(&cli->cl_loi_hp_ready_list))
+ RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
+ struct lov_oinfo, loi_hp_ready_item));
if (!list_empty(&cli->cl_loi_ready_list))
RETURN(list_entry(cli->cl_loi_ready_list.next,
- struct lov_oinfo, loi_cli_item));
+ struct lov_oinfo, loi_ready_item));
/* then if we have cache waiters, return all objects with queued
* writes. This is especially important when many small files
RETURN(NULL);
}
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
+{
+ struct osc_async_page *oap;
+ int hprpc = 0;
+
+ if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
+ oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
+ oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
/* called with the loi list lock held */
static void osc_check_rpcs(struct client_obd *cli)
{
while ((loi = osc_next_loi(cli)) != NULL) {
LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
- if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
+ if (osc_max_rpc_in_flight(cli, loi))
break;
/* attempt some read/write balancing by alternating between
/* attempt some inter-object balancing by issueing rpcs
* for each object in turn */
- if (!list_empty(&loi->loi_cli_item))
- list_del_init(&loi->loi_cli_item);
+ if (!list_empty(&loi->loi_hp_ready_item))
+ list_del_init(&loi->loi_hp_ready_item);
+ if (!list_empty(&loi->loi_ready_item))
+ list_del_init(&loi->loi_ready_item);
if (!list_empty(&loi->loi_write_item))
list_del_init(&loi->loi_write_item);
if (!list_empty(&loi->loi_read_item))
spin_lock_init(&oap->oap_lock);
- /* If the page was marked as notcacheable - don't add to any locks */
+ /* If the page was marked as notcacheable - don't add to any locks */
if (!nocache) {
- oid.name[0] = loi->loi_id;
+ osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
/* This is the only place where we can call cache_add_extent
without oap_lock, because this page is locked now, and
the lock we are adding it to is referenced, so cannot lose
RETURN(-EBUSY);
/* check if the file's owner/group is over quota */
-#ifdef HAVE_QUOTA_SUPPORT
if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
struct obd_async_page_ops *ops;
struct obdo *oa;
if (rc)
RETURN(rc);
}
-#endif
if (loi == NULL)
loi = lsm->lsm_oinfo[0];
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
oap->oap_async_flags |= ASYNC_READY;
- if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
- if (list_empty(&oap->oap_rpc_item)) {
+ if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+ list_empty(&oap->oap_rpc_item)) {
+ if (oap->oap_async_flags & ASYNC_HP)
list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- loi_list_maint(cli, loi);
- }
+ else
+ list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+ oap->oap_async_flags |= ASYNC_URGENT;
+ loi_list_maint(cli, loi);
}
LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
if (!list_empty(&oap->oap_urgent_item)) {
list_del_init(&oap->oap_urgent_item);
- oap->oap_async_flags &= ~ASYNC_URGENT;
+ oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
}
+
if (!list_empty(&oap->oap_pending_item)) {
list_del_init(&oap->oap_pending_item);
lop_update_pending(cli, lop, oap->oap_cmd, -1);
{
struct lustre_handle lockh = { 0 };
int rc;
- ENTRY;
-
+ ENTRY;
+
if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
- LBUG();
- }
+ LBUG();
+ }
switch (flag) {
case LDLM_CB_BLOCKING:
static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
ldlm_iterator_t replace, void *data)
{
- struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+ struct ldlm_res_id res_id;
struct obd_device *obd = class_exp2obd(exp);
+ osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
return 0;
}
struct ldlm_enqueue_info *einfo,
struct ptlrpc_request_set *rqset)
{
- struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
+ struct ldlm_res_id res_id;
struct obd_device *obd = exp->exp_obd;
struct ldlm_reply *rep;
struct ptlrpc_request *req = NULL;
int rc;
ENTRY;
+ osc_build_res_name(oinfo->oi_md->lsm_object_id,
+ oinfo->oi_md->lsm_object_gr, &res_id);
/* Filesystem lock extents are extended to page boundaries so that
* dealing with the page cache is a little smoother. */
oinfo->oi_policy.l_extent.start -=
no_match:
if (intent) {
- int size[3] = {
+ __u32 size[3] = {
[MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
[DLM_LOCKREQ_OFF + 1] = 0 };
RETURN(-ENOMEM);
size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
- size[DLM_REPLY_REC_OFF] =
+ size[DLM_REPLY_REC_OFF] =
sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
ptlrpc_req_set_repsize(req, 3, size);
}
if (!rc) {
struct osc_enqueue_args *aa;
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = (struct osc_enqueue_args *)&req->rq_async_args;
+ aa = ptlrpc_req_async_args(req);
aa->oa_oi = oinfo;
aa->oa_ei = einfo;
aa->oa_exp = exp;
__u32 type, ldlm_policy_data_t *policy, __u32 mode,
int *flags, void *data, struct lustre_handle *lockh)
{
- struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+ struct ldlm_res_id res_id;
struct obd_device *obd = exp->exp_obd;
int lflags = *flags;
ldlm_mode_t rc;
ENTRY;
+ osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
+
OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
/* Filesystem lock extents are extended to page boundaries so that
struct lov_stripe_md *lsm, int flags, void *opaque)
{
struct obd_device *obd = class_exp2obd(exp);
- struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+ struct ldlm_res_id res_id, *resp = NULL;
+
+ if (lsm != NULL) {
+ resp = osc_build_res_name(lsm->lsm_object_id,
+ lsm->lsm_object_gr, &res_id);
+ }
+
+ return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
- return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
- opaque);
}
static int osc_join_lru(struct obd_export *exp,
struct lov_stripe_md *lsm, int join)
{
struct obd_device *obd = class_exp2obd(exp);
- struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+ struct ldlm_res_id res_id, *resp = NULL;
+
+ if (lsm != NULL) {
+ resp = osc_build_res_name(lsm->lsm_object_id,
+ lsm->lsm_object_gr, &res_id);
+ }
+
+ return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
- return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
}
static int osc_statfs_interpret(struct ptlrpc_request *req,
{
struct ptlrpc_request *req;
struct osc_async_args *aa;
- int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
ENTRY;
/* We could possibly pass max_age in the request (as an absolute
req->rq_interpret_reply = osc_statfs_interpret;
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = (struct osc_async_args *)&req->rq_async_args;
+ aa = ptlrpc_req_async_args(req);
aa->aa_oi = oinfo;
ptlrpc_set_add_req(rqset, req);
struct obd_statfs *msfs;
struct ptlrpc_request *req;
struct obd_import *imp = NULL;
- int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
+ int rc;
ENTRY;
- /*Since the request might also come from lprocfs, so we need
+ /*Since the request might also come from lprocfs, so we need
*sync this with client_disconnect_export Bug15684*/
down_read(&obd->u.cli.cl_sem);
if (obd->u.cli.cl_import)
up_read(&obd->u.cli.cl_sem);
if (!imp)
RETURN(-ENODEV);
-
+
/* We could possibly pass max_age in the request (as an absolute
* timestamp or a "seconds.usec ago") so the target can avoid doing
* extra calls into the filesystem if that isn't necessary (e.g.
*
* @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
* the maximum number of OST indices which will fit in the user buffer.
- * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
+ * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
*/
static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
{
- struct lov_user_md lum, *lumk;
+ /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
+ struct lov_user_md_v3 lum, *lumk;
int rc = 0, lum_size;
+ struct lov_user_ost_data_v1 *lmm_objects;
ENTRY;
if (!lsm)
RETURN(-ENODATA);
- if (copy_from_user(&lum, lump, sizeof(lum)))
+ /* we only need the header part from user space to get lmm_magic and
+ * lmm_stripe_count, (the header part is common to v1 and v3) */
+ lum_size = sizeof(struct lov_user_md_v1);
+ if (copy_from_user(&lum, lump, lum_size))
RETURN(-EFAULT);
- if (lum.lmm_magic != LOV_USER_MAGIC)
+ if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
+ (lum.lmm_magic != LOV_USER_MAGIC_V3))
RETURN(-EINVAL);
+ /* lov_user_md_vX and lov_mds_md_vX must have the same size */
+ LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
+ LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
+ LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
+
+ /* we can use lov_mds_md_size() to compute lum_size
+ * because lov_user_md_vX and lov_mds_md_vX have the same size */
if (lum.lmm_stripe_count > 0) {
- lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
+ lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
OBD_ALLOC(lumk, lum_size);
if (!lumk)
RETURN(-ENOMEM);
-
- lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
+ if (lum.lmm_magic == LOV_USER_MAGIC_V1)
+ lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
+ else
+ lmm_objects = &(lumk->lmm_objects[0]);
+ lmm_objects->l_object_id = lsm->lsm_object_id;
} else {
- lum_size = sizeof(lum);
+ lum_size = lov_mds_md_size(0, lum.lmm_magic);
lumk = &lum;
}
int err = 0;
ENTRY;
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- MOD_INC_USE_COUNT;
-#else
if (!try_module_get(THIS_MODULE)) {
CERROR("Can't get module. Is it alive?");
return -EINVAL;
}
-#endif
switch (cmd) {
case OBD_IOC_LOV_GET_CONFIG: {
char *buf;
case OBD_IOC_DESTROY: {
struct obdo *oa;
- if (!capable (CAP_SYS_ADMIN))
+ if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO (out, err = -EPERM);
oa = &data->ioc_obdo1;
err = osc_destroy(exp, oa, NULL, NULL, NULL);
GOTO(out, err);
}
+ case OBD_IOC_PING_TARGET:
+ err = ptlrpc_obd_ping(obd);
+ GOTO(out, err);
default:
CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
cmd, cfs_curproc_comm());
GOTO(out, err = -ENOTTY);
}
out:
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- MOD_DEC_USE_COUNT;
-#else
module_put(THIS_MODULE);
-#endif
return err;
}
static int osc_get_info(struct obd_export *exp, obd_count keylen,
- void *key, __u32 *vallen, void *val)
+ void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
{
ENTRY;
if (!vallen || !val)
struct ptlrpc_request *req;
obd_id *reply;
char *bufs[2] = { NULL, key };
- int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
+ int rc;
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
OST_GET_INFO, 2, size, bufs);
out:
ptlrpc_req_finished(req);
RETURN(rc);
+ } else if (KEY_IS(KEY_FIEMAP)) {
+ struct ptlrpc_request *req;
+ struct ll_user_fiemap *reply;
+ char *bufs[2] = { NULL, key };
+ __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
+ int rc;
+
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+ OST_GET_INFO, 2, size, bufs);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ size[REPLY_REC_OFF] = *vallen;
+ ptlrpc_req_set_repsize(req, 2, size);
+
+ rc = ptlrpc_queue_wait(req);
+ if (rc)
+ GOTO(out1, rc);
+ reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
+ lustre_swab_fiemap);
+ if (reply == NULL) {
+ CERROR("Can't unpack FIEMAP reply.\n");
+ GOTO(out1, rc = -EPROTO);
+ }
+
+ memcpy(val, reply, *vallen);
+
+ out1:
+ ptlrpc_req_finished(req);
+
+ RETURN(rc);
}
+
RETURN(-EINVAL);
}
struct ptlrpc_request *req;
struct obd_device *obd = exp->exp_obd;
struct obd_import *imp = class_exp2cliimp(exp);
- int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
+ __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
char *bufs[3] = { NULL, key, val };
ENTRY;
static struct llog_operations osc_mds_ost_orig_logops;
static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *catid,
+ int count, struct llog_catid *catid,
struct obd_uuid *uuid)
{
int rc;
rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
&osc_size_repl_logops);
- if (rc)
+ if (rc) {
+ struct llog_ctxt *ctxt =
+ llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
+ if (ctxt)
+ llog_cleanup(ctxt);
CERROR("failed LLOG_SIZE_REPL_CTXT\n");
+ }
out:
if (rc) {
- CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
+ CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
obd->obd_name, tgt->obd_name, count, catid, rc);
CERROR("logid "LPX64":0x%x\n",
catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
struct obd_uuid *cluuid,
- struct obd_connect_data *data)
+ struct obd_connect_data *data,
+ void *localdata)
{
struct client_obd *cli = &obd->u.cli;
static int osc_disconnect(struct obd_export *exp)
{
struct obd_device *obd = class_exp2obd(exp);
- struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
+ struct llog_ctxt *ctxt;
int rc;
- if (obd->u.cli.cl_conn_count == 1)
- /* flush any remaining cancel messages out to the target */
- llog_sync(ctxt, exp);
-
- llog_ctxt_put(ctxt);
+ ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
+ if (ctxt) {
+ if (obd->u.cli.cl_conn_count == 1) {
+ /* Flush any remaining cancel messages out to the
+ * target */
+ llog_sync(ctxt, exp);
+ }
+ llog_ctxt_put(ctxt);
+ } else {
+ CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
+ obd);
+ }
rc = client_disconnect_export(exp);
return rc;
client import will not have been cleaned. */
if (obd->u.cli.cl_import) {
struct obd_import *imp;
+ down_write(&obd->u.cli.cl_sem);
imp = obd->u.cli.cl_import;
CDEBUG(D_CONFIG, "%s: client import never connected\n",
obd->obd_name);
ptlrpc_invalidate_import(imp);
- ptlrpc_free_rq_pool(imp->imp_rq_pool);
+ if (imp->imp_rq_pool) {
+ ptlrpc_free_rq_pool(imp->imp_rq_pool);
+ imp->imp_rq_pool = NULL;
+ }
class_destroy_import(imp);
+ up_write(&obd->u.cli.cl_sem);
obd->u.cli.cl_import = NULL;
}
rc = obd_llog_finish(obd, 0);
class_unregister_type(LUSTRE_OSC_NAME);
}
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
MODULE_LICENSE("GPL");