1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
78 atomic_t osc_resend_time;
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82 struct lov_stripe_md *lsm)
87 lmm_size = sizeof(**lmmp);
92 OBD_FREE(*lmmp, lmm_size);
98 OBD_ALLOC(*lmmp, lmm_size);
104 LASSERT(lsm->lsm_object_id);
105 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113 struct lov_mds_md *lmm, int lmm_bytes)
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == 0) {
127 CERROR("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
136 if (*lsmp != NULL && lmm == NULL) {
137 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 OBD_FREE(*lsmp, lsm_size);
144 OBD_ALLOC(*lsmp, lsm_size);
147 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149 OBD_FREE(*lsmp, lsm_size);
152 loi_init((*lsmp)->lsm_oinfo[0]);
156 /* XXX zero *lsmp? */
157 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158 LASSERT((*lsmp)->lsm_object_id);
161 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
169 struct ost_body *body;
170 struct osc_async_args *aa = data;
176 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
177 lustre_swab_ost_body);
179 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
182 /* This should really be sent by the OST */
183 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
184 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
186 CERROR("can't unpack ost_body\n");
188 aa->aa_oi->oi_oa->o_valid = 0;
191 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
195 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
196 struct ptlrpc_request_set *set)
198 struct ptlrpc_request *req;
199 struct ost_body *body;
200 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
201 struct osc_async_args *aa;
204 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
205 OST_GETATTR, 2, size,NULL);
209 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
210 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
212 ptlrpc_req_set_repsize(req, 2, size);
213 req->rq_interpret_reply = osc_getattr_interpret;
215 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
216 aa = ptlrpc_req_async_args(req);
219 ptlrpc_set_add_req(set, req);
223 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
225 struct ptlrpc_request *req;
226 struct ost_body *body;
227 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
231 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
232 OST_GETATTR, 2, size, NULL);
236 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
237 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
239 ptlrpc_req_set_repsize(req, 2, size);
241 rc = ptlrpc_queue_wait(req);
243 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
247 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
248 lustre_swab_ost_body);
250 CERROR ("can't unpack ost_body\n");
251 GOTO (out, rc = -EPROTO);
254 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
257 /* This should really be sent by the OST */
258 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
259 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
263 ptlrpc_req_finished(req);
267 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
268 struct obd_trans_info *oti)
270 struct ptlrpc_request *req;
271 struct ost_body *body;
272 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
276 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
277 OST_SETATTR, 2, size, NULL);
281 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
282 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
284 ptlrpc_req_set_repsize(req, 2, size);
286 rc = ptlrpc_queue_wait(req);
290 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
291 lustre_swab_ost_body);
293 GOTO(out, rc = -EPROTO);
295 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
299 ptlrpc_req_finished(req);
303 static int osc_setattr_interpret(struct ptlrpc_request *req,
306 struct ost_body *body;
307 struct osc_async_args *aa = data;
313 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
314 lustre_swab_ost_body);
316 CERROR("can't unpack ost_body\n");
317 GOTO(out, rc = -EPROTO);
320 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
322 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
326 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
327 struct obd_trans_info *oti,
328 struct ptlrpc_request_set *rqset)
330 struct ptlrpc_request *req;
331 struct ost_body *body;
332 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
334 struct osc_async_args *aa;
337 if (osc_exp_is_2_0_server(exp)) {
341 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
342 OST_SETATTR, bufcount, size, NULL);
346 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
348 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
350 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
353 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
354 ptlrpc_req_set_repsize(req, 2, size);
355 /* do mds to ost setattr asynchronouly */
357 /* Do not wait for response. */
358 ptlrpcd_add_req(req);
360 req->rq_interpret_reply = osc_setattr_interpret;
362 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
363 aa = ptlrpc_req_async_args(req);
366 ptlrpc_set_add_req(rqset, req);
372 int osc_real_create(struct obd_export *exp, struct obdo *oa,
373 struct lov_stripe_md **ea, struct obd_trans_info *oti)
375 struct ptlrpc_request *req;
376 struct ost_body *body;
377 struct lov_stripe_md *lsm;
378 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
387 rc = obd_alloc_memmd(exp, &lsm);
392 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
393 OST_CREATE, 2, size, NULL);
395 GOTO(out, rc = -ENOMEM);
397 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
398 lustre_set_wire_obdo(&body->oa, oa);
400 ptlrpc_req_set_repsize(req, 2, size);
401 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
402 oa->o_flags == OBD_FL_DELORPHAN) {
404 "delorphan from OST integration");
405 /* Don't resend the delorphan req */
406 req->rq_no_resend = req->rq_no_delay = 1;
409 rc = ptlrpc_queue_wait(req);
413 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
414 lustre_swab_ost_body);
416 CERROR ("can't unpack ost_body\n");
417 GOTO (out_req, rc = -EPROTO);
420 lustre_get_wire_obdo(oa, &body->oa);
422 /* This should really be sent by the OST */
423 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
424 oa->o_valid |= OBD_MD_FLBLKSZ;
426 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
427 * have valid lsm_oinfo data structs, so don't go touching that.
428 * This needs to be fixed in a big way.
430 lsm->lsm_object_id = oa->o_id;
434 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
436 if (oa->o_valid & OBD_MD_FLCOOKIE) {
437 if (!oti->oti_logcookies)
438 oti_alloc_cookies(oti, 1);
439 *oti->oti_logcookies = oa->o_lcookie;
443 CDEBUG(D_HA, "transno: "LPD64"\n",
444 lustre_msg_get_transno(req->rq_repmsg));
446 ptlrpc_req_finished(req);
449 obd_free_memmd(exp, &lsm);
453 static int osc_punch_interpret(struct ptlrpc_request *req,
456 struct ost_body *body;
457 struct osc_async_args *aa = data;
463 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
464 lustre_swab_ost_body);
466 CERROR ("can't unpack ost_body\n");
467 GOTO(out, rc = -EPROTO);
470 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
472 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
476 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
477 struct obd_trans_info *oti,
478 struct ptlrpc_request_set *rqset)
480 struct ptlrpc_request *req;
481 struct osc_async_args *aa;
482 struct ost_body *body;
483 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
492 OST_PUNCH, 2, size, NULL);
496 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
497 ptlrpc_at_set_req_timeout(req);
499 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
500 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
502 /* overload the size and blocks fields in the oa with start/end */
503 body->oa.o_size = oinfo->oi_policy.l_extent.start;
504 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
505 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
507 ptlrpc_req_set_repsize(req, 2, size);
509 req->rq_interpret_reply = osc_punch_interpret;
510 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
511 aa = ptlrpc_req_async_args(req);
513 ptlrpc_set_add_req(rqset, req);
518 static int osc_sync_interpret(struct ptlrpc_request *req,
521 struct ost_body *body;
522 struct osc_async_args *aa = data;
528 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529 lustre_swab_ost_body);
531 CERROR ("can't unpack ost_body\n");
532 GOTO(out, rc = -EPROTO);
535 *aa->aa_oi->oi_oa = body->oa;
537 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
541 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
542 obd_size start, obd_size end,
543 struct ptlrpc_request_set *set)
545 struct ptlrpc_request *req;
546 struct ost_body *body;
547 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
548 struct osc_async_args *aa;
556 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
557 OST_SYNC, 2, size, NULL);
561 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
562 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
564 /* overload the size and blocks fields in the oa with start/end */
565 body->oa.o_size = start;
566 body->oa.o_blocks = end;
567 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
569 ptlrpc_req_set_repsize(req, 2, size);
570 req->rq_interpret_reply = osc_sync_interpret;
572 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
573 aa = ptlrpc_req_async_args(req);
576 ptlrpc_set_add_req(set, req);
580 /* Find and cancel locally locks matched by @mode in the resource found by
581 * @objid. Found locks are added into @cancel list. Returns the amount of
582 * locks added to @cancels list. */
583 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
584 struct list_head *cancels, ldlm_mode_t mode,
587 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
588 struct ldlm_res_id res_id;
589 struct ldlm_resource *res;
593 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
594 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
598 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
599 lock_flags, 0, NULL);
600 ldlm_resource_putref(res);
604 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
607 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
609 atomic_dec(&cli->cl_destroy_in_flight);
610 cfs_waitq_signal(&cli->cl_destroy_waitq);
614 static int osc_can_send_destroy(struct client_obd *cli)
616 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617 cli->cl_max_rpcs_in_flight) {
618 /* The destroy request can be sent */
621 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622 cli->cl_max_rpcs_in_flight) {
624 * The counter has been modified between the two atomic
627 cfs_waitq_signal(&cli->cl_destroy_waitq);
632 /* Destroy requests can be async always on the client, and we don't even really
633 * care about the return code since the client cannot do anything at all about
635 * When the MDS is unlinking a filename, it saves the file objects into a
636 * recovery llog, and these object records are cancelled when the OST reports
637 * they were destroyed and sync'd to disk (i.e. transaction committed).
638 * If the client dies, or the OST is down when the object should be destroyed,
639 * the records are not cancelled, and when the OST reconnects to the MDS next,
640 * it will retrieve the llog unlink logs and then sends the log cancellation
641 * cookies to the MDS after committing destroy transactions. */
642 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
643 struct lov_stripe_md *ea, struct obd_trans_info *oti,
644 struct obd_export *md_export)
646 CFS_LIST_HEAD(cancels);
647 struct ptlrpc_request *req;
648 struct ost_body *body;
649 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
650 sizeof(struct ldlm_request) };
651 int count, bufcount = 2;
652 struct client_obd *cli = &exp->exp_obd->u.cli;
660 LASSERT(oa->o_id != 0);
662 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
663 LDLM_FL_DISCARD_DATA);
664 if (exp_connect_cancelset(exp))
666 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
667 size, REQ_REC_OFF + 1, 0, &cancels, count);
671 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
672 ptlrpc_at_set_req_timeout(req);
674 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
676 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
677 oa->o_lcookie = *oti->oti_logcookies;
680 lustre_set_wire_obdo(&body->oa, oa);
681 ptlrpc_req_set_repsize(req, 2, size);
683 /* don't throttle destroy RPCs for the MDT */
684 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
685 req->rq_interpret_reply = osc_destroy_interpret;
686 if (!osc_can_send_destroy(cli)) {
687 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
691 * Wait until the number of on-going destroy RPCs drops
692 * under max_rpc_in_flight
694 l_wait_event_exclusive(cli->cl_destroy_waitq,
695 osc_can_send_destroy(cli), &lwi);
699 /* Do not wait for response */
700 ptlrpcd_add_req(req);
704 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
707 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
709 LASSERT(!(oa->o_valid & bits));
712 client_obd_list_lock(&cli->cl_loi_list_lock);
713 oa->o_dirty = cli->cl_dirty;
714 if (cli->cl_dirty > cli->cl_dirty_max) {
715 CERROR("dirty %lu > dirty_max %lu\n",
716 cli->cl_dirty, cli->cl_dirty_max);
718 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages + 1) {
719 /* The atomic_read() allowing the atomic_inc() are not covered
720 * by a lock thus they may safely race and trip this CERROR()
721 * unless we add in a small fudge factor (+1). */
722 CERROR("dirty %d > system dirty_max %d\n",
723 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
725 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
726 CERROR("dirty %lu - dirty_max %lu too big???\n",
727 cli->cl_dirty, cli->cl_dirty_max);
730 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
731 (cli->cl_max_rpcs_in_flight + 1);
732 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
734 oa->o_grant = cli->cl_avail_grant;
735 oa->o_dropped = cli->cl_lost_grant;
736 cli->cl_lost_grant = 0;
737 client_obd_list_unlock(&cli->cl_loi_list_lock);
738 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
739 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
743 static void osc_update_next_shrink(struct client_obd *cli)
745 cli->cl_next_shrink_grant =
746 cfs_time_shift(cli->cl_grant_shrink_interval);
747 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
748 cli->cl_next_shrink_grant);
751 /* caller must hold loi_list_lock */
752 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
754 atomic_inc(&obd_dirty_pages);
755 cli->cl_dirty += CFS_PAGE_SIZE;
756 cli->cl_avail_grant -= CFS_PAGE_SIZE;
757 pga->flag |= OBD_BRW_FROM_GRANT;
758 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
759 CFS_PAGE_SIZE, pga, pga->pg);
760 LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
761 cli->cl_avail_grant);
762 osc_update_next_shrink(cli);
765 /* the companion to osc_consume_write_grant, called when a brw has completed.
766 * must be called with the loi lock held. */
767 static void osc_release_write_grant(struct client_obd *cli,
768 struct brw_page *pga, int sent)
770 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
773 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
778 pga->flag &= ~OBD_BRW_FROM_GRANT;
779 atomic_dec(&obd_dirty_pages);
780 cli->cl_dirty -= CFS_PAGE_SIZE;
782 cli->cl_lost_grant += CFS_PAGE_SIZE;
783 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
784 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
785 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
786 /* For short writes we shouldn't count parts of pages that
787 * span a whole block on the OST side, or our accounting goes
788 * wrong. Should match the code in filter_grant_check. */
789 int offset = pga->off & ~CFS_PAGE_MASK;
790 int count = pga->count + (offset & (blocksize - 1));
791 int end = (offset + pga->count) & (blocksize - 1);
793 count += blocksize - end;
795 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
796 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
797 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
798 cli->cl_avail_grant, cli->cl_dirty);
804 static unsigned long rpcs_in_flight(struct client_obd *cli)
806 return cli->cl_r_in_flight + cli->cl_w_in_flight;
809 /* caller must hold loi_list_lock */
810 void osc_wake_cache_waiters(struct client_obd *cli)
812 struct list_head *l, *tmp;
813 struct osc_cache_waiter *ocw;
816 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
817 /* if we can't dirty more, we must wait until some is written */
818 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
819 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
820 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
821 "osc max %ld, sys max %d\n", cli->cl_dirty,
822 cli->cl_dirty_max, obd_max_dirty_pages);
826 /* if still dirty cache but no grant wait for pending RPCs that
827 * may yet return us some grant before doing sync writes */
828 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
829 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
830 cli->cl_w_in_flight);
834 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
835 list_del_init(&ocw->ocw_entry);
836 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
837 /* no more RPCs in flight to return grant, do sync IO */
838 ocw->ocw_rc = -EDQUOT;
839 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
841 osc_consume_write_grant(cli,
842 &ocw->ocw_oap->oap_brw_page);
845 cfs_waitq_signal(&ocw->ocw_waitq);
851 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
853 client_obd_list_lock(&cli->cl_loi_list_lock);
854 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
855 if (body->oa.o_valid & OBD_MD_FLGRANT)
856 cli->cl_avail_grant += body->oa.o_grant;
857 /* waiters are woken in brw_interpret */
858 client_obd_list_unlock(&cli->cl_loi_list_lock);
861 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
862 void *key, obd_count vallen, void *val,
863 struct ptlrpc_request_set *set);
865 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
868 struct osc_grant_args *aa = data;
869 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870 struct obdo *oa = aa->aa_oa;
871 struct ost_body *body;
874 client_obd_list_lock(&cli->cl_loi_list_lock);
875 cli->cl_avail_grant += oa->o_grant;
876 client_obd_list_unlock(&cli->cl_loi_list_lock);
879 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
880 lustre_swab_ost_body);
881 osc_update_grant(cli, body);
887 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
889 client_obd_list_lock(&cli->cl_loi_list_lock);
890 oa->o_grant = cli->cl_avail_grant / 4;
891 cli->cl_avail_grant -= oa->o_grant;
892 client_obd_list_unlock(&cli->cl_loi_list_lock);
893 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
894 oa->o_valid |= OBD_MD_FLFLAGS;
897 oa->o_flags |= OBD_FL_SHRINK_GRANT;
898 osc_update_next_shrink(cli);
901 /* Shrink the current grant, either from some large amount to enough for a
902 * full set of in-flight RPCs, or if we have already shrunk to that limit
903 * then to enough for a single RPC. This avoids keeping more grant than
904 * needed, and avoids shrinking the grant piecemeal. */
905 static int osc_shrink_grant(struct client_obd *cli)
907 long target = (cli->cl_max_rpcs_in_flight + 1) *
908 cli->cl_max_pages_per_rpc;
910 client_obd_list_lock(&cli->cl_loi_list_lock);
911 if (cli->cl_avail_grant <= target)
912 target = cli->cl_max_pages_per_rpc;
913 client_obd_list_unlock(&cli->cl_loi_list_lock);
915 return osc_shrink_grant_to_target(cli, target);
918 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
921 struct ost_body *body;
924 client_obd_list_lock(&cli->cl_loi_list_lock);
925 /* Don't shrink if we are already above or below the desired limit
926 * We don't want to shrink below a single RPC, as that will negatively
927 * impact block allocation and long-term performance. */
928 if (target < cli->cl_max_pages_per_rpc)
929 target = cli->cl_max_pages_per_rpc;
931 if (target >= cli->cl_avail_grant) {
932 client_obd_list_unlock(&cli->cl_loi_list_lock);
935 client_obd_list_unlock(&cli->cl_loi_list_lock);
941 osc_announce_cached(cli, &body->oa, 0);
943 client_obd_list_lock(&cli->cl_loi_list_lock);
944 body->oa.o_grant = cli->cl_avail_grant - target;
945 cli->cl_avail_grant = target;
946 client_obd_list_unlock(&cli->cl_loi_list_lock);
947 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
948 body->oa.o_valid |= OBD_MD_FLFLAGS;
949 body->oa.o_flags = 0;
951 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
952 osc_update_next_shrink(cli);
954 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
955 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
956 sizeof(*body), body, NULL);
958 client_obd_list_lock(&cli->cl_loi_list_lock);
959 cli->cl_avail_grant += body->oa.o_grant;
960 client_obd_list_unlock(&cli->cl_loi_list_lock);
966 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
967 static int osc_should_shrink_grant(struct client_obd *client)
969 cfs_time_t time = cfs_time_current();
970 cfs_time_t next_shrink = client->cl_next_shrink_grant;
972 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
973 OBD_CONNECT_GRANT_SHRINK) == 0)
976 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
977 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
978 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
981 osc_update_next_shrink(client);
986 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
988 struct client_obd *client;
990 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
991 if (osc_should_shrink_grant(client))
992 osc_shrink_grant(client);
997 static int osc_add_shrink_grant(struct client_obd *client)
1001 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1003 osc_grant_shrink_grant_cb, NULL,
1004 &client->cl_grant_shrink_list);
1006 CERROR("add grant client %s error %d\n",
1007 client->cl_import->imp_obd->obd_name, rc);
1010 CDEBUG(D_CACHE, "add grant client %s \n",
1011 client->cl_import->imp_obd->obd_name);
1012 osc_update_next_shrink(client);
1016 static int osc_del_shrink_grant(struct client_obd *client)
1018 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1022 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1025 * ocd_grant is the total grant amount we're expect to hold: if we'v
1026 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1027 * to 0 as inflight rpcs fail out; otherwise, it's avail_grant + dirty.
1029 * race is tolerable here: if we're evicted, but imp_state already
1030 * left EVICTED state, then cl_diry must be 0 already.
1032 client_obd_list_lock(&cli->cl_loi_list_lock);
1033 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1034 cli->cl_avail_grant = ocd->ocd_grant;
1036 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1038 if (cli->cl_avail_grant < 0) {
1039 CWARN("%s: available grant < 0, the OSS is probaly not running"
1040 " with patch from bug 20278 (%ld)\n",
1041 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1042 /* workaround for 1.6 servers which do not have
1043 * the patch from bug 20278 */
1044 cli->cl_avail_grant = ocd->ocd_grant;
1046 client_obd_list_unlock(&cli->cl_loi_list_lock);
1048 CDEBUG(D_CACHE, "%s: setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1049 cli->cl_import->imp_obd->obd_name,
1050 cli->cl_avail_grant, cli->cl_lost_grant);
1052 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1053 list_empty(&cli->cl_grant_shrink_list))
1054 osc_add_shrink_grant(cli);
1057 /* We assume that the reason this OSC got a short read is because it read
1058 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1059 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1060 * this stripe never got written at or beyond this stripe offset yet. */
1061 static void handle_short_read(int nob_read, obd_count page_count,
1062 struct brw_page **pga, int pshift)
1067 /* skip bytes read OK */
1068 while (nob_read > 0) {
1069 LASSERT (page_count > 0);
1071 if (pga[i]->count > nob_read) {
1072 /* EOF inside this page */
1073 ptr = cfs_kmap(pga[i]->pg) +
1074 (OSC_FILE2MEM_OFF(pga[i]->off,pshift)&~CFS_PAGE_MASK);
1075 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1076 cfs_kunmap(pga[i]->pg);
1082 nob_read -= pga[i]->count;
1087 /* zero remaining pages */
1088 while (page_count-- > 0) {
1089 ptr = cfs_kmap(pga[i]->pg) +
1090 (OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK);
1091 memset(ptr, 0, pga[i]->count);
1092 cfs_kunmap(pga[i]->pg);
1097 static int check_write_rcs(struct ptlrpc_request *req,
1098 int requested_nob, int niocount,
1099 obd_count page_count, struct brw_page **pga)
1103 /* return error if any niobuf was in error */
1104 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1105 sizeof(*remote_rcs) * niocount, NULL);
1106 if (remote_rcs == NULL) {
1107 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1110 if (lustre_rep_need_swab(req))
1111 for (i = 0; i < niocount; i++)
1112 __swab32s(&remote_rcs[i]);
1114 for (i = 0; i < niocount; i++) {
1115 if (remote_rcs[i] < 0)
1116 return(remote_rcs[i]);
1118 if (remote_rcs[i] != 0) {
1119 CERROR("rc[%d] invalid (%d) req %p\n",
1120 i, remote_rcs[i], req);
1125 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1126 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1127 req->rq_bulk->bd_nob_transferred, requested_nob);
1134 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1136 if (p1->flag != p2->flag) {
1137 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1139 /* warn if we try to combine flags that we don't know to be
1140 * safe to combine */
1141 if ((p1->flag & mask) != (p2->flag & mask))
1142 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1143 "same brw?\n", p1->flag, p2->flag);
1147 return (p1->off + p1->count == p2->off);
1150 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1151 struct brw_page **pga, int opc,
1152 cksum_type_t cksum_type, int pshift)
1157 LASSERT (pg_count > 0);
1158 cksum = init_checksum(cksum_type);
1159 while (nob > 0 && pg_count > 0) {
1160 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1161 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1162 int count = pga[i]->count > nob ? nob : pga[i]->count;
1164 /* corrupt the data before we compute the checksum, to
1165 * simulate an OST->client data error */
1166 if (i == 0 && opc == OST_READ &&
1167 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1168 memcpy(ptr + off, "bad1", min(4, nob));
1169 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1170 cfs_kunmap(pga[i]->pg);
1171 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1174 nob -= pga[i]->count;
1178 /* For sending we only compute the wrong checksum instead
1179 * of corrupting the data so it is still correct on a redo */
1180 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1186 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1187 struct lov_stripe_md *lsm, obd_count page_count,
1188 struct brw_page **pga,
1189 struct ptlrpc_request **reqp, int pshift,
1192 struct ptlrpc_request *req;
1193 struct ptlrpc_bulk_desc *desc;
1194 struct ost_body *body;
1195 struct obd_ioobj *ioobj;
1196 struct niobuf_remote *niobuf;
1197 __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1198 int niocount, i, requested_nob, opc, rc;
1199 struct ptlrpc_request_pool *pool;
1200 struct osc_brw_async_args *aa;
1201 struct brw_page *pg_prev;
1204 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1205 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1207 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1208 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1210 for (niocount = i = 1; i < page_count; i++) {
1211 if (!can_merge_pages(pga[i - 1], pga[i]))
1215 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1216 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1218 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1223 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1224 ptlrpc_at_set_req_timeout(req);
1226 if (opc == OST_WRITE)
1227 desc = ptlrpc_prep_bulk_imp (req, page_count,
1228 BULK_GET_SOURCE, OST_BULK_PORTAL);
1230 desc = ptlrpc_prep_bulk_imp (req, page_count,
1231 BULK_PUT_SINK, OST_BULK_PORTAL);
1233 GOTO(out, rc = -ENOMEM);
1234 /* NB request now owns desc and will free it when it gets freed */
1236 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1237 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1238 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1239 niocount * sizeof(*niobuf));
1241 lustre_set_wire_obdo(&body->oa, oa);
1242 obdo_to_ioobj(oa, ioobj);
1243 ioobj->ioo_bufcnt = niocount;
1245 LASSERT (page_count > 0);
1247 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1248 struct brw_page *pg = pga[i];
1250 LASSERT(pg->count > 0);
1251 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1252 pg->count <= CFS_PAGE_SIZE,
1253 "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1254 i, pg, pg->off, pg->count, pshift);
1256 LASSERTF(i == 0 || pg->off > pg_prev->off,
1257 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1258 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1260 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1261 pg_prev->pg, page_private(pg_prev->pg),
1262 pg_prev->pg->index, pg_prev->off);
1264 LASSERTF(i == 0 || pg->off > pg_prev->off,
1265 "i %d p_c %u\n", i, page_count);
1267 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1268 (pg->flag & OBD_BRW_SRVLOCK));
1270 ptlrpc_prep_bulk_page(desc, pg->pg,
1271 OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1273 requested_nob += pg->count;
1275 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1277 niobuf->len += pg->count;
1279 niobuf->offset = pg->off;
1280 niobuf->len = pg->count;
1281 niobuf->flags = pg->flag;
1286 LASSERTF((void *)(niobuf - niocount) ==
1287 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1288 niocount * sizeof(*niobuf)),
1289 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1290 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1291 (void *)(niobuf - niocount));
1293 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1295 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1296 body->oa.o_valid |= OBD_MD_FLFLAGS;
1297 body->oa.o_flags = 0;
1299 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1302 if (osc_should_shrink_grant(cli))
1303 osc_shrink_grant_local(cli, &body->oa);
1305 /* size[REQ_REC_OFF] still sizeof (*body) */
1306 if (opc == OST_WRITE) {
1307 if (cli->cl_checksum) {
1308 /* store cl_cksum_type in a local variable since
1309 * it can be changed via lprocfs */
1310 cksum_type_t cksum_type = cli->cl_cksum_type;
1312 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1313 oa->o_flags &= OBD_FL_LOCAL_MASK;
1314 body->oa.o_flags = 0;
1316 body->oa.o_flags |= cksum_type_pack(cksum_type);
1317 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1318 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1321 cksum_type, pshift);
1322 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1324 /* save this in 'oa', too, for later checking */
1325 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1326 oa->o_flags |= cksum_type_pack(cksum_type);
1328 /* clear out the checksum flag, in case this is a
1329 * resend but cl_checksum is no longer set. b=11238 */
1330 oa->o_valid &= ~OBD_MD_FLCKSUM;
1332 oa->o_cksum = body->oa.o_cksum;
1333 /* 1 RC per niobuf */
1334 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1335 ptlrpc_req_set_repsize(req, 3, size);
1337 if (cli->cl_checksum) {
1338 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1339 body->oa.o_flags = 0;
1340 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1341 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1343 /* 1 RC for the whole I/O */
1344 ptlrpc_req_set_repsize(req, 2, size);
1347 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1348 aa = ptlrpc_req_async_args(req);
1350 aa->aa_requested_nob = requested_nob;
1351 aa->aa_nio_count = niocount;
1352 aa->aa_page_count = page_count;
1356 aa->aa_pshift = pshift;
1357 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1363 ptlrpc_req_finished (req);
1367 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1368 __u32 client_cksum, __u32 server_cksum, int nob,
1369 obd_count page_count, struct brw_page **pga,
1370 cksum_type_t client_cksum_type, int pshift)
1374 cksum_type_t cksum_type;
1376 if (server_cksum == client_cksum) {
1377 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1381 /* If this is mmaped file - it can be changed at any time */
1382 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1385 if (oa->o_valid & OBD_MD_FLFLAGS)
1386 cksum_type = cksum_type_unpack(oa->o_flags);
1388 cksum_type = OBD_CKSUM_CRC32;
1390 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1391 cksum_type, pshift);
1393 if (cksum_type != client_cksum_type)
1394 msg = "the server did not use the checksum type specified in "
1395 "the original request - likely a protocol problem";
1396 else if (new_cksum == server_cksum)
1397 msg = "changed on the client after we checksummed it - "
1398 "likely false positive due to mmap IO (bug 11742)";
1399 else if (new_cksum == client_cksum)
1400 msg = "changed in transit before arrival at OST";
1402 msg = "changed in transit AND doesn't match the original - "
1403 "likely false positive due to mmap IO (bug 11742)";
1405 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1406 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1407 "["LPU64"-"LPU64"]\n",
1408 msg, libcfs_nid2str(peer->nid),
1409 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1410 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1413 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1415 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1416 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1417 "client csum now %x\n", client_cksum, client_cksum_type,
1418 server_cksum, cksum_type, new_cksum);
1423 /* Note rc enters this function as number of bytes transferred */
1424 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1426 struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1427 const lnet_process_id_t *peer =
1428 &req->rq_import->imp_connection->c_peer;
1429 struct client_obd *cli = aa->aa_cli;
1430 struct ost_body *body;
1431 __u32 client_cksum = 0;
1434 if (rc < 0 && rc != -EDQUOT)
1437 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1438 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1439 lustre_swab_ost_body);
1441 CERROR ("Can't unpack body\n");
1445 /* set/clear over quota flag for a uid/gid */
1446 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1447 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1448 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1449 body->oa.o_gid, body->oa.o_valid,
1452 osc_update_grant(cli, body);
1457 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1458 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1460 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1462 CERROR ("Unexpected +ve rc %d\n", rc);
1465 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1467 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1468 check_write_checksum(&body->oa, peer, client_cksum,
1469 body->oa.o_cksum, aa->aa_requested_nob,
1470 aa->aa_page_count, aa->aa_ppga,
1471 cksum_type_unpack(aa->aa_oa->o_flags),
1475 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1476 aa->aa_page_count, aa->aa_ppga);
1480 /* The rest of this function executes only for OST_READs */
1481 if (rc > aa->aa_requested_nob) {
1482 CERROR("Unexpected rc %d (%d requested)\n", rc,
1483 aa->aa_requested_nob);
1487 if (rc != req->rq_bulk->bd_nob_transferred) {
1488 CERROR ("Unexpected rc %d (%d transferred)\n",
1489 rc, req->rq_bulk->bd_nob_transferred);
1493 if (rc < aa->aa_requested_nob)
1494 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga, aa->aa_pshift);
1496 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1497 static int cksum_counter;
1498 __u32 server_cksum = body->oa.o_cksum;
1501 cksum_type_t cksum_type;
1503 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1504 cksum_type = cksum_type_unpack(body->oa.o_flags);
1506 cksum_type = OBD_CKSUM_CRC32;
1507 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1508 aa->aa_ppga, OST_READ,
1509 cksum_type, aa->aa_pshift);
1511 if (peer->nid == req->rq_bulk->bd_sender) {
1515 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1518 if (server_cksum == ~0 && rc > 0) {
1519 CERROR("Protocol error: server %s set the 'checksum' "
1520 "bit, but didn't send a checksum. Not fatal, "
1521 "but please notify on http://bugzilla.lustre.org/\n",
1522 libcfs_nid2str(peer->nid));
1523 } else if (server_cksum != client_cksum) {
1524 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1525 "%s%s%s inum "LPU64"/"LPU64" object "
1526 LPU64"/"LPU64" extent "
1527 "["LPU64"-"LPU64"]\n",
1528 req->rq_import->imp_obd->obd_name,
1529 libcfs_nid2str(peer->nid),
1531 body->oa.o_valid & OBD_MD_FLFID ?
1532 body->oa.o_fid : (__u64)0,
1533 body->oa.o_valid & OBD_MD_FLFID ?
1534 body->oa.o_generation :(__u64)0,
1536 body->oa.o_valid & OBD_MD_FLGROUP ?
1537 body->oa.o_gr : (__u64)0,
1538 aa->aa_ppga[0]->off,
1539 aa->aa_ppga[aa->aa_page_count-1]->off +
1540 aa->aa_ppga[aa->aa_page_count-1]->count -
1542 CERROR("client %x, server %x, cksum_type %x\n",
1543 client_cksum, server_cksum, cksum_type);
1545 aa->aa_oa->o_cksum = client_cksum;
1549 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1552 } else if (unlikely(client_cksum)) {
1553 static int cksum_missed;
1556 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1557 CERROR("Checksum %u requested from %s but not sent\n",
1558 cksum_missed, libcfs_nid2str(peer->nid));
1564 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1569 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1570 struct lov_stripe_md *lsm,
1571 obd_count page_count, struct brw_page **pga)
1573 struct ptlrpc_request *request;
1577 struct l_wait_info lwi;
1580 init_waitqueue_head(&waitq);
1583 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1584 page_count, pga, &request, 0, resends);
1588 rc = ptlrpc_queue_wait(request);
1590 if (rc == -ETIMEDOUT && request->rq_resend) {
1591 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1592 ptlrpc_req_finished(request);
1596 rc = osc_brw_fini_request(request, rc);
1598 ptlrpc_req_finished(request);
1599 if (osc_recoverable_error(rc)) {
1601 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1602 CERROR("too many resend retries, returning error\n");
1606 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1607 l_wait_event(waitq, 0, &lwi);
1614 int osc_brw_redo_request(struct ptlrpc_request *request,
1615 struct osc_brw_async_args *aa)
1617 struct ptlrpc_request *new_req;
1618 struct ptlrpc_request_set *set = request->rq_set;
1619 struct osc_brw_async_args *new_aa;
1620 struct osc_async_page *oap;
1624 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1625 CERROR("too many resent retries, returning error\n");
1629 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1631 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1632 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1633 aa->aa_cli, aa->aa_oa,
1634 NULL /* lsm unused by osc currently */,
1635 aa->aa_page_count, aa->aa_ppga, &new_req,
1640 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1642 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1643 if (oap->oap_request != NULL) {
1644 LASSERTF(request == oap->oap_request,
1645 "request %p != oap_request %p\n",
1646 request, oap->oap_request);
1647 if (oap->oap_interrupted) {
1648 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1649 ptlrpc_req_finished(new_req);
1654 /* New request takes over pga and oaps from old request.
1655 * Note that copying a list_head doesn't work, need to move it... */
1657 new_req->rq_interpret_reply = request->rq_interpret_reply;
1658 new_req->rq_async_args = request->rq_async_args;
1659 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1661 new_aa = ptlrpc_req_async_args(new_req);
1663 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1664 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1665 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1667 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1668 if (oap->oap_request) {
1669 ptlrpc_req_finished(oap->oap_request);
1670 oap->oap_request = ptlrpc_request_addref(new_req);
1674 /* use ptlrpc_set_add_req is safe because interpret functions work
1675 * in check_set context. only one way exist with access to request
1676 * from different thread got -EINTR - this way protected with
1677 * cl_loi_list_lock */
1678 ptlrpc_set_add_req(set, new_req);
1680 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1682 DEBUG_REQ(D_INFO, new_req, "new request");
1686 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1687 struct lov_stripe_md *lsm, obd_count page_count,
1688 struct brw_page **pga, struct ptlrpc_request_set *set,
1691 struct ptlrpc_request *request;
1692 struct client_obd *cli = &exp->exp_obd->u.cli;
1694 struct osc_brw_async_args *aa;
1697 /* Consume write credits even if doing a sync write -
1698 * otherwise we may run out of space on OST due to grant. */
1699 /* FIXME: unaligned writes must use write grants too */
1700 if (cmd == OBD_BRW_WRITE && pshift == 0) {
1701 client_obd_list_lock(&cli->cl_loi_list_lock);
1702 for (i = 0; i < page_count; i++) {
1703 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1704 osc_consume_write_grant(cli, pga[i]);
1706 client_obd_list_unlock(&cli->cl_loi_list_lock);
1709 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1710 page_count, pga, &request, pshift, 0);
1712 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1715 aa = ptlrpc_req_async_args(request);
1716 /* Do we need to separate dio stats? */
1717 if (cmd == OBD_BRW_READ) {
1718 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1719 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1721 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1722 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1723 cli->cl_w_in_flight);
1725 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1727 LASSERT(list_empty(&aa->aa_oaps));
1729 request->rq_interpret_reply = brw_interpret;
1730 ptlrpc_set_add_req(set, request);
1732 client_obd_list_lock(&cli->cl_loi_list_lock);
1733 if (cmd == OBD_BRW_READ)
1734 cli->cl_dio_r_in_flight++;
1736 cli->cl_dio_w_in_flight++;
1737 client_obd_list_unlock(&cli->cl_loi_list_lock);
1739 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1740 } else if (cmd == OBD_BRW_WRITE) {
1741 client_obd_list_lock(&cli->cl_loi_list_lock);
1742 for (i = 0; i < page_count; i++)
1743 osc_release_write_grant(cli, pga[i], 0);
1744 osc_wake_cache_waiters(cli);
1745 client_obd_list_unlock(&cli->cl_loi_list_lock);
1752 * ugh, we want disk allocation on the target to happen in offset order. we'll
1753 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1754 * fine for our small page arrays and doesn't require allocation. its an
1755 * insertion sort that swaps elements that are strides apart, shrinking the
1756 * stride down until its '1' and the array is sorted.
1758 static void sort_brw_pages(struct brw_page **array, int num)
1761 struct brw_page *tmp;
1765 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1770 for (i = stride ; i < num ; i++) {
1773 while (j >= stride && array[j-stride]->off > tmp->off) {
1774 array[j] = array[j - stride];
1779 } while (stride > 1);
1782 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1789 LASSERT (pages > 0);
1790 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1794 if (pages == 0) /* that's all */
1797 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1798 return count; /* doesn't end on page boundary */
1801 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1802 if (offset != 0) /* doesn't start on page boundary */
1809 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1811 struct brw_page **ppga;
1814 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1818 for (i = 0; i < count; i++)
1823 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1825 LASSERT(ppga != NULL);
1826 OBD_FREE(ppga, sizeof(*ppga) * count);
1829 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1830 obd_count page_count, struct brw_page *pga,
1831 struct obd_trans_info *oti)
1833 struct obdo *saved_oa = NULL;
1834 struct brw_page **ppga, **orig;
1835 struct obd_import *imp = class_exp2cliimp(exp);
1836 struct client_obd *cli;
1837 int rc, page_count_orig;
1840 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1841 cli = &imp->imp_obd->u.cli;
1843 if (cmd & OBD_BRW_CHECK) {
1844 /* The caller just wants to know if there's a chance that this
1845 * I/O can succeed */
1847 if (imp->imp_invalid)
1852 /* test_brw with a failed create can trip this, maybe others. */
1853 LASSERT(cli->cl_max_pages_per_rpc);
1857 orig = ppga = osc_build_ppga(pga, page_count);
1860 page_count_orig = page_count;
1862 sort_brw_pages(ppga, page_count);
1863 while (page_count) {
1864 obd_count pages_per_brw;
1866 if (page_count > cli->cl_max_pages_per_rpc)
1867 pages_per_brw = cli->cl_max_pages_per_rpc;
1869 pages_per_brw = page_count;
1871 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1873 if (saved_oa != NULL) {
1874 /* restore previously saved oa */
1875 *oinfo->oi_oa = *saved_oa;
1876 } else if (page_count > pages_per_brw) {
1877 /* save a copy of oa (brw will clobber it) */
1878 OBDO_ALLOC(saved_oa);
1879 if (saved_oa == NULL)
1880 GOTO(out, rc = -ENOMEM);
1881 *saved_oa = *oinfo->oi_oa;
1884 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1885 pages_per_brw, ppga);
1890 page_count -= pages_per_brw;
1891 ppga += pages_per_brw;
1895 osc_release_ppga(orig, page_count_orig);
1897 if (saved_oa != NULL)
1898 OBDO_FREE(saved_oa);
1903 static int osc_brw_async(int cmd, struct obd_export *exp,
1904 struct obd_info *oinfo, obd_count page_count,
1905 struct brw_page *pga, struct obd_trans_info *oti,
1906 struct ptlrpc_request_set *set, int pshift)
1908 struct brw_page **ppga, **orig;
1909 int page_count_orig;
1913 if (cmd & OBD_BRW_CHECK) {
1914 /* The caller just wants to know if there's a chance that this
1915 * I/O can succeed */
1916 struct obd_import *imp = class_exp2cliimp(exp);
1918 if (imp == NULL || imp->imp_invalid)
1923 orig = ppga = osc_build_ppga(pga, page_count);
1926 page_count_orig = page_count;
1928 sort_brw_pages(ppga, page_count);
1929 while (page_count) {
1930 struct brw_page **copy;
1932 obd_count pages_per_brw;
1934 /* one page less under unaligned direct i/o */
1935 pages_per_brw = min_t(obd_count, page_count,
1936 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1939 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1942 /* use ppga only if single RPC is going to fly */
1943 if (pages_per_brw != page_count_orig || ppga != orig) {
1944 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1946 GOTO(out, rc = -ENOMEM);
1947 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1951 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1952 GOTO(out, rc = -ENOMEM);
1954 memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1955 if (oa->o_valid & OBD_MD_FLFLAGS) {
1956 oa->o_flags |= OBD_FL_TEMPORARY;
1958 oa->o_valid |= OBD_MD_FLFLAGS;
1959 oa->o_flags = OBD_FL_TEMPORARY;
1964 LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1967 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1972 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1974 if (oa->o_valid & OBD_MD_FLFLAGS &&
1975 oa->o_flags & OBD_FL_TEMPORARY)
1981 /* we passed it to async_internal() which is
1982 * now responsible for releasing memory */
1986 page_count -= pages_per_brw;
1987 ppga += pages_per_brw;
1991 osc_release_ppga(orig, page_count_orig);
1995 static void osc_check_rpcs(struct client_obd *cli);
1997 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1998 * the dirty accounting. Writeback completes or truncate happens before
1999 * writing starts. Must be called with the loi lock held. */
2000 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
2003 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
2006 /* This maintains the lists of pending pages to read/write for a given object
2007 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
2008 * to quickly find objects that are ready to send an RPC. */
2009 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
2015 if (lop->lop_num_pending == 0)
2018 /* if we have an invalid import we want to drain the queued pages
2019 * by forcing them through rpcs that immediately fail and complete
2020 * the pages. recovery relies on this to empty the queued pages
2021 * before canceling the locks and evicting down the llite pages */
2022 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2025 /* stream rpcs in queue order as long as as there is an urgent page
2026 * queued. this is our cheap solution for good batching in the case
2027 * where writepage marks some random page in the middle of the file
2028 * as urgent because of, say, memory pressure */
2029 if (!list_empty(&lop->lop_urgent)) {
2030 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2034 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
2035 optimal = cli->cl_max_pages_per_rpc;
2036 if (cmd & OBD_BRW_WRITE) {
2037 /* trigger a write rpc stream as long as there are dirtiers
2038 * waiting for space. as they're waiting, they're not going to
2039 * create more pages to coallesce with what's waiting.. */
2040 if (!list_empty(&cli->cl_cache_waiters)) {
2041 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2045 /* +16 to avoid triggering rpcs that would want to include pages
2046 * that are being queued but which can't be made ready until
2047 * the queuer finishes with the page. this is a wart for
2048 * llite::commit_write() */
2051 if (lop->lop_num_pending >= optimal)
2057 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2059 struct osc_async_page *oap;
2062 if (list_empty(&lop->lop_urgent))
2065 oap = list_entry(lop->lop_urgent.next,
2066 struct osc_async_page, oap_urgent_item);
2068 if (oap->oap_async_flags & ASYNC_HP) {
2069 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2076 static void on_list(struct list_head *item, struct list_head *list,
2079 if (list_empty(item) && should_be_on)
2080 list_add_tail(item, list);
2081 else if (!list_empty(item) && !should_be_on)
2082 list_del_init(item);
2085 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2086 * can find pages to build into rpcs quickly */
2087 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2089 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2090 lop_makes_hprpc(&loi->loi_read_lop)) {
2092 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2093 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2095 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2096 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2097 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2098 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2101 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2102 loi->loi_write_lop.lop_num_pending);
2104 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2105 loi->loi_read_lop.lop_num_pending);
2108 static void lop_update_pending(struct client_obd *cli,
2109 struct loi_oap_pages *lop, int cmd, int delta)
2111 lop->lop_num_pending += delta;
2112 if (cmd & OBD_BRW_WRITE)
2113 cli->cl_pending_w_pages += delta;
2115 cli->cl_pending_r_pages += delta;
2118 /* this is called when a sync waiter receives an interruption. Its job is to
2119 * get the caller woken as soon as possible. If its page hasn't been put in an
2120 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2121 * desiring interruption which will forcefully complete the rpc once the rpc
2123 static void osc_occ_interrupted(struct oig_callback_context *occ)
2125 struct osc_async_page *oap;
2126 struct loi_oap_pages *lop;
2127 struct lov_oinfo *loi;
2130 /* XXX member_of() */
2131 oap = list_entry(occ, struct osc_async_page, oap_occ);
2133 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2135 oap->oap_interrupted = 1;
2137 /* ok, it's been put in an rpc. only one oap gets a request reference */
2138 if (oap->oap_request != NULL) {
2139 ptlrpc_mark_interrupted(oap->oap_request);
2140 ptlrpcd_wake(oap->oap_request);
2144 /* we don't get interruption callbacks until osc_trigger_group_io()
2145 * has been called and put the sync oaps in the pending/urgent lists.*/
2146 if (!list_empty(&oap->oap_pending_item)) {
2147 list_del_init(&oap->oap_pending_item);
2148 list_del_init(&oap->oap_urgent_item);
2151 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2152 &loi->loi_write_lop : &loi->loi_read_lop;
2153 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2154 loi_list_maint(oap->oap_cli, oap->oap_loi);
2156 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2157 oap->oap_oig = NULL;
2161 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2164 /* this is trying to propogate async writeback errors back up to the
2165 * application. As an async write fails we record the error code for later if
2166 * the app does an fsync. As long as errors persist we force future rpcs to be
2167 * sync so that the app can get a sync error and break the cycle of queueing
2168 * pages for which writeback will fail. */
2169 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2176 ar->ar_force_sync = 1;
2177 ar->ar_min_xid = ptlrpc_sample_next_xid();
2182 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2183 ar->ar_force_sync = 0;
2186 static void osc_oap_to_pending(struct osc_async_page *oap)
2188 struct loi_oap_pages *lop;
2190 if (oap->oap_cmd & OBD_BRW_WRITE)
2191 lop = &oap->oap_loi->loi_write_lop;
2193 lop = &oap->oap_loi->loi_read_lop;
2195 if (oap->oap_async_flags & ASYNC_HP)
2196 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2197 else if (oap->oap_async_flags & ASYNC_URGENT)
2198 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2199 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2200 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2203 /* this must be called holding the loi list lock to give coverage to exit_cache,
2204 * async_flag maintenance, and oap_request */
2205 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2206 struct osc_async_page *oap, int sent, int rc)
2211 if (oap->oap_request != NULL) {
2212 xid = ptlrpc_req_xid(oap->oap_request);
2213 ptlrpc_req_finished(oap->oap_request);
2214 oap->oap_request = NULL;
2217 spin_lock(&oap->oap_lock);
2218 oap->oap_async_flags = 0;
2219 spin_unlock(&oap->oap_lock);
2220 oap->oap_interrupted = 0;
2222 if (oap->oap_cmd & OBD_BRW_WRITE) {
2223 osc_process_ar(&cli->cl_ar, xid, rc);
2224 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2227 if (rc == 0 && oa != NULL) {
2228 if (oa->o_valid & OBD_MD_FLBLOCKS)
2229 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2230 if (oa->o_valid & OBD_MD_FLMTIME)
2231 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2232 if (oa->o_valid & OBD_MD_FLATIME)
2233 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2234 if (oa->o_valid & OBD_MD_FLCTIME)
2235 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2239 osc_exit_cache(cli, oap, sent);
2240 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2241 oap->oap_oig = NULL;
2246 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2247 oap->oap_cmd, oa, rc);
2249 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2250 * I/O on the page could start, but OSC calls it under lock
2251 * and thus we can add oap back to pending safely */
2253 /* upper layer wants to leave the page on pending queue */
2254 osc_oap_to_pending(oap);
2256 osc_exit_cache(cli, oap, sent);
2260 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2262 struct osc_brw_async_args *aa = data;
2263 struct client_obd *cli;
2266 rc = osc_brw_fini_request(request, rc);
2267 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2269 if (osc_recoverable_error(rc)) {
2270 /* Only retry once for mmaped files since the mmaped page
2271 * might be modified at anytime. We have to retry at least
2272 * once in case there WAS really a corruption of the page
2273 * on the network, that was not caused by mmap() modifying
2274 * the page. bug 11742 */
2275 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2276 aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2277 aa->aa_oa->o_flags & OBD_FL_MMAP) {
2280 rc = osc_brw_redo_request(request, aa);
2287 client_obd_list_lock(&cli->cl_loi_list_lock);
2288 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2289 struct osc_async_page *oap, *tmp;
2291 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2292 * is called so we know whether to go to sync BRWs or wait for more
2293 * RPCs to complete */
2294 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2295 cli->cl_w_in_flight--;
2297 cli->cl_r_in_flight--;
2299 /* the caller may re-use the oap after the completion call so
2300 * we need to clean it up a little */
2301 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2302 list_del_init(&oap->oap_rpc_item);
2303 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2305 OBDO_FREE(aa->aa_oa);
2306 } else { /* from async_internal() */
2308 for (i = 0; i < aa->aa_page_count; i++)
2309 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2311 if (aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2312 aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2313 OBDO_FREE(aa->aa_oa);
2315 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2316 cli->cl_dio_w_in_flight--;
2318 cli->cl_dio_r_in_flight--;
2320 osc_wake_cache_waiters(cli);
2321 osc_check_rpcs(cli);
2322 client_obd_list_unlock(&cli->cl_loi_list_lock);
2324 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2329 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2330 struct list_head *rpc_list,
2331 int page_count, int cmd)
2333 struct ptlrpc_request *req;
2334 struct brw_page **pga = NULL;
2335 struct osc_brw_async_args *aa;
2336 struct obdo *oa = NULL;
2337 struct obd_async_page_ops *ops = NULL;
2338 void *caller_data = NULL;
2339 struct osc_async_page *oap;
2340 struct ldlm_lock *lock = NULL;
2345 LASSERT(!list_empty(rpc_list));
2347 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2349 RETURN(ERR_PTR(-ENOMEM));
2353 GOTO(out, req = ERR_PTR(-ENOMEM));
2356 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2358 ops = oap->oap_caller_ops;
2359 caller_data = oap->oap_caller_data;
2360 lock = oap->oap_ldlm_lock;
2362 pga[i] = &oap->oap_brw_page;
2363 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2364 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2365 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2369 /* always get the data for the obdo for the rpc */
2370 LASSERT(ops != NULL);
2371 ops->ap_fill_obdo(caller_data, cmd, oa);
2373 oa->o_handle = lock->l_remote_handle;
2374 oa->o_valid |= OBD_MD_FLHANDLE;
2377 sort_brw_pages(pga, page_count);
2378 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0,
2381 CERROR("prep_req failed: %d\n", rc);
2382 GOTO(out, req = ERR_PTR(rc));
2384 oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2385 sizeof(struct ost_body)))->oa;
2387 /* Need to update the timestamps after the request is built in case
2388 * we race with setattr (locally or in queue at OST). If OST gets
2389 * later setattr before earlier BRW (as determined by the request xid),
2390 * the OST will not use BRW timestamps. Sadly, there is no obvious
2391 * way to do this in a single call. bug 10150 */
2392 if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2393 /* in case of lockless read/write do not use inode's
2394 * timestamps because concurrent stat might fill the
2395 * inode with out-of-date times, send current
2397 if (cmd & OBD_BRW_WRITE) {
2398 oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2399 oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2400 valid = OBD_MD_FLATIME;
2402 oa->o_atime = LTIME_S(CURRENT_TIME);
2403 oa->o_valid |= OBD_MD_FLATIME;
2404 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2407 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2409 ops->ap_update_obdo(caller_data, cmd, oa, valid);
2411 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2412 aa = ptlrpc_req_async_args(req);
2413 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2414 list_splice(rpc_list, &aa->aa_oaps);
2415 CFS_INIT_LIST_HEAD(rpc_list);
2422 OBD_FREE(pga, sizeof(*pga) * page_count);
2427 /* the loi lock is held across this function but it's allowed to release
2428 * and reacquire it during its work */
2430 * prepare pages for ASYNC io and put pages in send queue.
2434 * \param cmd - OBD_BRW_* macroses
2435 * \param lop - pending pages
2437 * \return zero if pages successfully add to send queue.
2438 * \return not zere if error occurring.
2440 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2441 int cmd, struct loi_oap_pages *lop)
2443 struct ptlrpc_request *req;
2444 obd_count page_count = 0;
2445 struct osc_async_page *oap = NULL, *tmp;
2446 struct osc_brw_async_args *aa;
2447 struct obd_async_page_ops *ops;
2448 CFS_LIST_HEAD(rpc_list);
2449 unsigned int ending_offset;
2450 unsigned starting_offset = 0;
2454 /* If there are HP OAPs we need to handle at least 1 of them,
2455 * move it the beginning of the pending list for that. */
2456 if (!list_empty(&lop->lop_urgent)) {
2457 oap = list_entry(lop->lop_urgent.next,
2458 struct osc_async_page, oap_urgent_item);
2459 if (oap->oap_async_flags & ASYNC_HP)
2460 list_move(&oap->oap_pending_item, &lop->lop_pending);
2463 /* first we find the pages we're allowed to work with */
2464 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2465 ops = oap->oap_caller_ops;
2467 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2468 "magic 0x%x\n", oap, oap->oap_magic);
2470 if (page_count != 0 &&
2471 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2472 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2473 " oap %p, page %p, srvlock %u\n",
2474 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2477 /* in llite being 'ready' equates to the page being locked
2478 * until completion unlocks it. commit_write submits a page
2479 * as not ready because its unlock will happen unconditionally
2480 * as the call returns. if we race with commit_write giving
2481 * us that page we dont' want to create a hole in the page
2482 * stream, so we stop and leave the rpc to be fired by
2483 * another dirtier or kupdated interval (the not ready page
2484 * will still be on the dirty list). we could call in
2485 * at the end of ll_file_write to process the queue again. */
2486 if (!(oap->oap_async_flags & ASYNC_READY)) {
2487 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2489 CDEBUG(D_INODE, "oap %p page %p returned %d "
2490 "instead of ready\n", oap,
2494 /* llite is telling us that the page is still
2495 * in commit_write and that we should try
2496 * and put it in an rpc again later. we
2497 * break out of the loop so we don't create
2498 * a hole in the sequence of pages in the rpc
2503 /* the io isn't needed.. tell the checks
2504 * below to complete the rpc with EINTR */
2505 spin_lock(&oap->oap_lock);
2506 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2507 spin_unlock(&oap->oap_lock);
2508 oap->oap_count = -EINTR;
2511 spin_lock(&oap->oap_lock);
2512 oap->oap_async_flags |= ASYNC_READY;
2513 spin_unlock(&oap->oap_lock);
2516 LASSERTF(0, "oap %p page %p returned %d "
2517 "from make_ready\n", oap,
2525 * Page submitted for IO has to be locked. Either by
2526 * ->ap_make_ready() or by higher layers.
2528 #if defined(__KERNEL__) && defined(__linux__)
2529 if(!(PageLocked(oap->oap_page) &&
2530 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2531 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2532 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2536 /* If there is a gap at the start of this page, it can't merge
2537 * with any previous page, so we'll hand the network a
2538 * "fragmented" page array that it can't transfer in 1 RDMA */
2539 if (page_count != 0 && oap->oap_page_off != 0)
2542 /* take the page out of our book-keeping */
2543 list_del_init(&oap->oap_pending_item);
2544 lop_update_pending(cli, lop, cmd, -1);
2545 list_del_init(&oap->oap_urgent_item);
2547 if (page_count == 0)
2548 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2549 (PTLRPC_MAX_BRW_SIZE - 1);
2551 /* ask the caller for the size of the io as the rpc leaves. */
2552 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2554 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2555 if (oap->oap_count <= 0) {
2556 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2558 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2562 /* now put the page back in our accounting */
2563 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2564 if (page_count == 0)
2565 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2566 if (++page_count >= cli->cl_max_pages_per_rpc)
2569 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2570 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2571 * have the same alignment as the initial writes that allocated
2572 * extents on the server. */
2573 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2574 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2575 if (ending_offset == 0)
2578 /* If there is a gap at the end of this page, it can't merge
2579 * with any subsequent pages, so we'll hand the network a
2580 * "fragmented" page array that it can't transfer in 1 RDMA */
2581 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2585 osc_wake_cache_waiters(cli);
2587 if (page_count == 0)
2590 loi_list_maint(cli, loi);
2592 client_obd_list_unlock(&cli->cl_loi_list_lock);
2594 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2596 /* this should happen rarely and is pretty bad, it makes the
2597 * pending list not follow the dirty order */
2598 client_obd_list_lock(&cli->cl_loi_list_lock);
2599 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2600 list_del_init(&oap->oap_rpc_item);
2602 /* queued sync pages can be torn down while the pages
2603 * were between the pending list and the rpc */
2604 if (oap->oap_interrupted) {
2605 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2606 osc_ap_completion(cli, NULL, oap, 0,
2610 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2612 loi_list_maint(cli, loi);
2613 RETURN(PTR_ERR(req));
2616 aa = ptlrpc_req_async_args(req);
2617 if (cmd == OBD_BRW_READ) {
2618 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2619 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2620 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2621 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2623 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2624 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2625 cli->cl_w_in_flight);
2626 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2627 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2629 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2631 client_obd_list_lock(&cli->cl_loi_list_lock);
2633 if (cmd == OBD_BRW_READ)
2634 cli->cl_r_in_flight++;
2636 cli->cl_w_in_flight++;
2638 /* queued sync pages can be torn down while the pages
2639 * were between the pending list and the rpc */
2641 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2642 /* only one oap gets a request reference */
2645 if (oap->oap_interrupted && !req->rq_intr) {
2646 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2648 ptlrpc_mark_interrupted(req);
2652 tmp->oap_request = ptlrpc_request_addref(req);
2654 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2655 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2657 req->rq_interpret_reply = brw_interpret;
2658 ptlrpcd_add_req(req);
2662 #define LOI_DEBUG(LOI, STR, args...) \
2663 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2664 !list_empty(&(LOI)->loi_ready_item) || \
2665 !list_empty(&(LOI)->loi_hp_ready_item), \
2666 (LOI)->loi_write_lop.lop_num_pending, \
2667 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2668 (LOI)->loi_read_lop.lop_num_pending, \
2669 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2672 /* This is called by osc_check_rpcs() to find which objects have pages that
2673 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2674 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2677 /* First return objects that have blocked locks so that they
2678 * will be flushed quickly and other clients can get the lock,
2679 * then objects which have pages ready to be stuffed into RPCs */
2680 if (!list_empty(&cli->cl_loi_hp_ready_list))
2681 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2682 struct lov_oinfo, loi_hp_ready_item));
2683 if (!list_empty(&cli->cl_loi_ready_list))
2684 RETURN(list_entry(cli->cl_loi_ready_list.next,
2685 struct lov_oinfo, loi_ready_item));
2687 /* then if we have cache waiters, return all objects with queued
2688 * writes. This is especially important when many small files
2689 * have filled up the cache and not been fired into rpcs because
2690 * they don't pass the nr_pending/object threshhold */
2691 if (!list_empty(&cli->cl_cache_waiters) &&
2692 !list_empty(&cli->cl_loi_write_list))
2693 RETURN(list_entry(cli->cl_loi_write_list.next,
2694 struct lov_oinfo, loi_write_item));
2696 /* then return all queued objects when we have an invalid import
2697 * so that they get flushed */
2698 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2699 if (!list_empty(&cli->cl_loi_write_list))
2700 RETURN(list_entry(cli->cl_loi_write_list.next,
2701 struct lov_oinfo, loi_write_item));
2702 if (!list_empty(&cli->cl_loi_read_list))
2703 RETURN(list_entry(cli->cl_loi_read_list.next,
2704 struct lov_oinfo, loi_read_item));
2709 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2711 struct osc_async_page *oap;
2714 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2715 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2716 struct osc_async_page, oap_urgent_item);
2717 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2720 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2721 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2722 struct osc_async_page, oap_urgent_item);
2723 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2726 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2729 /* called with the loi list lock held */
2730 static void osc_check_rpcs(struct client_obd *cli)
2732 struct lov_oinfo *loi;
2733 int rc = 0, race_counter = 0;
2736 while ((loi = osc_next_loi(cli)) != NULL) {
2737 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2739 if (osc_max_rpc_in_flight(cli, loi))
2742 /* attempt some read/write balancing by alternating between
2743 * reads and writes in an object. The makes_rpc checks here
2744 * would be redundant if we were getting read/write work items
2745 * instead of objects. we don't want send_oap_rpc to drain a
2746 * partial read pending queue when we're given this object to
2747 * do io on writes while there are cache waiters */
2748 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2749 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2750 &loi->loi_write_lop);
2758 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2759 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2760 &loi->loi_read_lop);
2769 /* attempt some inter-object balancing by issueing rpcs
2770 * for each object in turn */
2771 if (!list_empty(&loi->loi_hp_ready_item))
2772 list_del_init(&loi->loi_hp_ready_item);
2773 if (!list_empty(&loi->loi_ready_item))
2774 list_del_init(&loi->loi_ready_item);
2775 if (!list_empty(&loi->loi_write_item))
2776 list_del_init(&loi->loi_write_item);
2777 if (!list_empty(&loi->loi_read_item))
2778 list_del_init(&loi->loi_read_item);
2780 loi_list_maint(cli, loi);
2782 /* send_oap_rpc fails with 0 when make_ready tells it to
2783 * back off. llite's make_ready does this when it tries
2784 * to lock a page queued for write that is already locked.
2785 * we want to try sending rpcs from many objects, but we
2786 * don't want to spin failing with 0. */
2787 if (race_counter == 10)
2793 /* we're trying to queue a page in the osc so we're subject to the
2794 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2795 * If the osc's queued pages are already at that limit, then we want to sleep
2796 * until there is space in the osc's queue for us. We also may be waiting for
2797 * write credits from the OST if there are RPCs in flight that may return some
2798 * before we fall back to sync writes.
2800 * We need this know our allocation was granted in the presence of signals */
2801 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2805 client_obd_list_lock(&cli->cl_loi_list_lock);
2806 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2807 client_obd_list_unlock(&cli->cl_loi_list_lock);
2811 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2812 * grant or cache space. */
2813 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2814 struct osc_async_page *oap)
2816 struct osc_cache_waiter ocw;
2817 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2820 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2821 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2822 cli->cl_dirty_max, obd_max_dirty_pages,
2823 cli->cl_lost_grant, cli->cl_avail_grant);
2825 /* force the caller to try sync io. this can jump the list
2826 * of queued writes and create a discontiguous rpc stream */
2827 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2828 loi->loi_ar.ar_force_sync)
2831 /* Hopefully normal case - cache space and write credits available */
2832 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2833 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2834 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2835 /* account for ourselves */
2836 osc_consume_write_grant(cli, &oap->oap_brw_page);
2840 /* It is safe to block as a cache waiter as long as there is grant
2841 * space available or the hope of additional grant being returned
2842 * when an in flight write completes. Using the write back cache
2843 * if possible is preferable to sending the data synchronously
2844 * because write pages can then be merged in to large requests.
2845 * The addition of this cache waiter will causing pending write
2846 * pages to be sent immediately. */
2847 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2848 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2849 cfs_waitq_init(&ocw.ocw_waitq);
2853 loi_list_maint(cli, loi);
2854 osc_check_rpcs(cli);
2855 client_obd_list_unlock(&cli->cl_loi_list_lock);
2857 CDEBUG(D_CACHE, "sleeping for cache space\n");
2858 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2860 client_obd_list_lock(&cli->cl_loi_list_lock);
2861 if (!list_empty(&ocw.ocw_entry)) {
2862 list_del(&ocw.ocw_entry);
2871 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2872 void **res, int rw, obd_off start, obd_off end,
2873 struct lustre_handle *lockh, int flags)
2875 struct ldlm_lock *lock = NULL;
2876 int rc, release = 0;
2880 if (lockh && lustre_handle_is_used(lockh)) {
2881 /* if a valid lockh is passed, just check that the corresponding
2882 * lock covers the extent */
2883 lock = ldlm_handle2lock(lockh);
2886 struct osc_async_page *oap = *res;
2887 spin_lock(&oap->oap_lock);
2888 lock = oap->oap_ldlm_lock;
2890 LDLM_LOCK_GET(lock);
2891 spin_unlock(&oap->oap_lock);
2893 /* lock can be NULL in case race obd_get_lock vs lock cancel
2894 * so we should be don't try match this */
2895 if (unlikely(!lock))
2898 rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2899 if (release == 1 && rc == 1)
2900 /* if a valid lockh was passed, we just need to check
2901 * that the lock covers the page, no reference should be
2903 ldlm_lock_decref(lockh,
2904 rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2905 LDLM_LOCK_PUT(lock);
2909 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2910 struct lov_oinfo *loi, cfs_page_t *page,
2911 obd_off offset, struct obd_async_page_ops *ops,
2912 void *data, void **res, int flags,
2913 struct lustre_handle *lockh)
2915 struct osc_async_page *oap;
2916 struct ldlm_res_id oid = {{0}};
2922 return size_round(sizeof(*oap));
2925 oap->oap_magic = OAP_MAGIC;
2926 oap->oap_cli = &exp->exp_obd->u.cli;
2929 oap->oap_caller_ops = ops;
2930 oap->oap_caller_data = data;
2932 oap->oap_page = page;
2933 oap->oap_obj_off = offset;
2935 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2936 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2937 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2938 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2940 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2942 spin_lock_init(&oap->oap_lock);
2944 /* If the page was marked as notcacheable - don't add to any locks */
2945 if (!(flags & OBD_PAGE_NO_CACHE)) {
2946 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2947 /* This is the only place where we can call cache_add_extent
2948 without oap_lock, because this page is locked now, and
2949 the lock we are adding it to is referenced, so cannot lose
2950 any pages either. */
2951 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2956 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2960 struct osc_async_page *oap_from_cookie(void *cookie)
2962 struct osc_async_page *oap = cookie;
2963 if (oap->oap_magic != OAP_MAGIC)
2964 return ERR_PTR(-EINVAL);
2968 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2969 struct lov_oinfo *loi, void *cookie,
2970 int cmd, obd_off off, int count,
2971 obd_flag brw_flags, enum async_flags async_flags)
2973 struct client_obd *cli = &exp->exp_obd->u.cli;
2974 struct osc_async_page *oap;
2978 oap = oap_from_cookie(cookie);
2980 RETURN(PTR_ERR(oap));
2982 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2985 if (!list_empty(&oap->oap_pending_item) ||
2986 !list_empty(&oap->oap_urgent_item) ||
2987 !list_empty(&oap->oap_rpc_item))
2990 /* check if the file's owner/group is over quota */
2991 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2992 struct obd_async_page_ops *ops;
2999 ops = oap->oap_caller_ops;
3000 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
3001 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
3011 loi = lsm->lsm_oinfo[0];
3013 client_obd_list_lock(&cli->cl_loi_list_lock);
3016 oap->oap_page_off = off;
3017 oap->oap_count = count;
3018 oap->oap_brw_flags = brw_flags;
3019 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3020 if (libcfs_memory_pressure_get())
3021 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3022 spin_lock(&oap->oap_lock);
3023 oap->oap_async_flags = async_flags;
3024 spin_unlock(&oap->oap_lock);
3026 if (cmd & OBD_BRW_WRITE) {
3027 rc = osc_enter_cache(cli, loi, oap);
3029 client_obd_list_unlock(&cli->cl_loi_list_lock);
3034 osc_oap_to_pending(oap);
3035 loi_list_maint(cli, loi);
3037 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3040 osc_check_rpcs(cli);
3041 client_obd_list_unlock(&cli->cl_loi_list_lock);
3046 /* aka (~was & now & flag), but this is more clear :) */
3047 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3049 static int osc_set_async_flags(struct obd_export *exp,
3050 struct lov_stripe_md *lsm,
3051 struct lov_oinfo *loi, void *cookie,
3052 obd_flag async_flags)
3054 struct client_obd *cli = &exp->exp_obd->u.cli;
3055 struct loi_oap_pages *lop;
3056 struct osc_async_page *oap;
3060 oap = oap_from_cookie(cookie);
3062 RETURN(PTR_ERR(oap));
3065 * bug 7311: OST-side locking is only supported for liblustre for now
3066 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
3067 * implementation has to handle case where OST-locked page was picked
3068 * up by, e.g., ->writepage().
3070 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
3071 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
3074 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3078 loi = lsm->lsm_oinfo[0];
3080 if (oap->oap_cmd & OBD_BRW_WRITE) {
3081 lop = &loi->loi_write_lop;
3083 lop = &loi->loi_read_lop;
3086 client_obd_list_lock(&cli->cl_loi_list_lock);
3087 /* oap_lock provides atomic semantics of oap_async_flags access */
3088 spin_lock(&oap->oap_lock);
3089 if (list_empty(&oap->oap_pending_item))
3090 GOTO(out, rc = -EINVAL);
3092 if ((oap->oap_async_flags & async_flags) == async_flags)
3095 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3096 oap->oap_async_flags |= ASYNC_READY;
3098 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3099 list_empty(&oap->oap_rpc_item)) {
3100 if (oap->oap_async_flags & ASYNC_HP)
3101 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3103 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3104 oap->oap_async_flags |= ASYNC_URGENT;
3105 loi_list_maint(cli, loi);
3108 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3109 oap->oap_async_flags);
3111 spin_unlock(&oap->oap_lock);
3112 osc_check_rpcs(cli);
3113 client_obd_list_unlock(&cli->cl_loi_list_lock);
3117 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3118 struct lov_oinfo *loi,
3119 struct obd_io_group *oig, void *cookie,
3120 int cmd, obd_off off, int count,
3122 obd_flag async_flags)
3124 struct client_obd *cli = &exp->exp_obd->u.cli;
3125 struct osc_async_page *oap;
3126 struct loi_oap_pages *lop;
3130 oap = oap_from_cookie(cookie);
3132 RETURN(PTR_ERR(oap));
3134 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3137 if (!list_empty(&oap->oap_pending_item) ||
3138 !list_empty(&oap->oap_urgent_item) ||
3139 !list_empty(&oap->oap_rpc_item))
3143 loi = lsm->lsm_oinfo[0];
3145 client_obd_list_lock(&cli->cl_loi_list_lock);
3148 oap->oap_page_off = off;
3149 oap->oap_count = count;
3150 oap->oap_brw_flags = brw_flags;
3151 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3152 if (libcfs_memory_pressure_get())
3153 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3154 spin_lock(&oap->oap_lock);
3155 oap->oap_async_flags = async_flags;
3156 spin_unlock(&oap->oap_lock);
3158 if (cmd & OBD_BRW_WRITE)
3159 lop = &loi->loi_write_lop;
3161 lop = &loi->loi_read_lop;
3163 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3164 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3166 rc = oig_add_one(oig, &oap->oap_occ);
3169 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3170 oap, oap->oap_page, rc);
3172 client_obd_list_unlock(&cli->cl_loi_list_lock);
3177 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3178 struct loi_oap_pages *lop, int cmd)
3180 struct list_head *pos, *tmp;
3181 struct osc_async_page *oap;
3183 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3184 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3185 list_del(&oap->oap_pending_item);
3186 osc_oap_to_pending(oap);
3188 loi_list_maint(cli, loi);
3191 static int osc_trigger_group_io(struct obd_export *exp,
3192 struct lov_stripe_md *lsm,
3193 struct lov_oinfo *loi,
3194 struct obd_io_group *oig)
3196 struct client_obd *cli = &exp->exp_obd->u.cli;
3200 loi = lsm->lsm_oinfo[0];
3202 client_obd_list_lock(&cli->cl_loi_list_lock);
3204 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3205 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3207 osc_check_rpcs(cli);
3208 client_obd_list_unlock(&cli->cl_loi_list_lock);
3213 static int osc_teardown_async_page(struct obd_export *exp,
3214 struct lov_stripe_md *lsm,
3215 struct lov_oinfo *loi, void *cookie)
3217 struct client_obd *cli = &exp->exp_obd->u.cli;
3218 struct loi_oap_pages *lop;
3219 struct osc_async_page *oap;
3223 oap = oap_from_cookie(cookie);
3225 RETURN(PTR_ERR(oap));
3228 loi = lsm->lsm_oinfo[0];
3230 if (oap->oap_cmd & OBD_BRW_WRITE) {
3231 lop = &loi->loi_write_lop;
3233 lop = &loi->loi_read_lop;
3236 client_obd_list_lock(&cli->cl_loi_list_lock);
3238 if (!list_empty(&oap->oap_rpc_item))
3239 GOTO(out, rc = -EBUSY);
3241 osc_exit_cache(cli, oap, 0);
3242 osc_wake_cache_waiters(cli);
3244 if (!list_empty(&oap->oap_urgent_item)) {
3245 list_del_init(&oap->oap_urgent_item);
3246 spin_lock(&oap->oap_lock);
3247 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3248 spin_unlock(&oap->oap_lock);
3251 if (!list_empty(&oap->oap_pending_item)) {
3252 list_del_init(&oap->oap_pending_item);
3253 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3255 loi_list_maint(cli, loi);
3256 cache_remove_extent(cli->cl_cache, oap);
3258 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3260 client_obd_list_unlock(&cli->cl_loi_list_lock);
3264 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3265 struct ldlm_lock_desc *new, void *data,
3268 struct lustre_handle lockh = { 0 };
3272 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3273 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3278 case LDLM_CB_BLOCKING:
3279 ldlm_lock2handle(lock, &lockh);
3280 rc = ldlm_cli_cancel(&lockh);
3282 CERROR("ldlm_cli_cancel failed: %d\n", rc);
3284 case LDLM_CB_CANCELING: {
3286 ldlm_lock2handle(lock, &lockh);
3287 /* This lock wasn't granted, don't try to do anything */
3288 if (lock->l_req_mode != lock->l_granted_mode)
3291 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3294 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3295 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3296 lock, new, data,flag);
3305 EXPORT_SYMBOL(osc_extent_blocking_cb);
3307 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3310 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3313 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3316 lock_res_and_lock(lock);
3317 #if defined (__KERNEL__) && defined (__linux__)
3318 /* Liang XXX: Darwin and Winnt checking should be added */
3319 if (lock->l_ast_data && lock->l_ast_data != data) {
3320 struct inode *new_inode = data;
3321 struct inode *old_inode = lock->l_ast_data;
3322 if (!(old_inode->i_state & I_FREEING))
3323 LDLM_ERROR(lock, "inconsistent l_ast_data found");
3324 LASSERTF(old_inode->i_state & I_FREEING,
3325 "Found existing inode %p/%lu/%u state %lu in lock: "
3326 "setting data to %p/%lu/%u\n", old_inode,
3327 old_inode->i_ino, old_inode->i_generation,
3329 new_inode, new_inode->i_ino, new_inode->i_generation);
3332 lock->l_ast_data = data;
3333 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3334 unlock_res_and_lock(lock);
3335 LDLM_LOCK_PUT(lock);
3338 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3339 ldlm_iterator_t replace, void *data)
3341 struct ldlm_res_id res_id;
3342 struct obd_device *obd = class_exp2obd(exp);
3344 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3345 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3349 /* find any ldlm lock of the inode in osc
3353 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3354 ldlm_iterator_t replace, void *data)
3356 struct ldlm_res_id res_id;
3357 struct obd_device *obd = class_exp2obd(exp);
3360 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3361 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3362 if (rc == LDLM_ITER_STOP)
3364 if (rc == LDLM_ITER_CONTINUE)
3369 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3370 struct obd_info *oinfo, int intent, int rc)
3375 /* The request was created before ldlm_cli_enqueue call. */
3376 if (rc == ELDLM_LOCK_ABORTED) {
3377 struct ldlm_reply *rep;
3379 /* swabbed by ldlm_cli_enqueue() */
3380 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3381 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3383 LASSERT(rep != NULL);
3384 if (rep->lock_policy_res1)
3385 rc = rep->lock_policy_res1;
3389 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3390 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3391 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3392 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3393 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3397 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3399 /* Call the update callback. */
3400 rc = oinfo->oi_cb_up(oinfo, rc);
3404 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3407 struct osc_enqueue_args *aa = data;
3408 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3409 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3410 struct ldlm_lock *lock;
3412 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3414 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3416 /* Complete obtaining the lock procedure. */
3417 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3419 &aa->oa_oi->oi_flags,
3420 &lsm->lsm_oinfo[0]->loi_lvb,
3421 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3422 lustre_swab_ost_lvb,
3423 aa->oa_oi->oi_lockh, rc);
3425 /* Complete osc stuff. */
3426 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3428 /* Release the lock for async request. */
3429 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3430 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3432 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3433 aa->oa_oi->oi_lockh, req, aa);
3434 LDLM_LOCK_PUT(lock);
3438 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3439 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3440 * other synchronous requests, however keeping some locks and trying to obtain
3441 * others may take a considerable amount of time in a case of ost failure; and
3442 * when other sync requests do not get released lock from a client, the client
3443 * is excluded from the cluster -- such scenarious make the life difficult, so
3444 * release locks just after they are obtained. */
3445 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3446 struct ldlm_enqueue_info *einfo,
3447 struct ptlrpc_request_set *rqset)
3449 struct ldlm_res_id res_id;
3450 struct obd_device *obd = exp->exp_obd;
3451 struct ldlm_reply *rep;
3452 struct ptlrpc_request *req = NULL;
3453 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3458 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3459 oinfo->oi_md->lsm_object_gr, &res_id);
3460 /* Filesystem lock extents are extended to page boundaries so that
3461 * dealing with the page cache is a little smoother. */
3462 oinfo->oi_policy.l_extent.start -=
3463 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3464 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3466 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3469 /* Next, search for already existing extent locks that will cover us */
3470 /* If we're trying to read, we also search for an existing PW lock. The
3471 * VFS and page cache already protect us locally, so lots of readers/
3472 * writers can share a single PW lock.
3474 * There are problems with conversion deadlocks, so instead of
3475 * converting a read lock to a write lock, we'll just enqueue a new
3478 * At some point we should cancel the read lock instead of making them
3479 * send us a blocking callback, but there are problems with canceling
3480 * locks out from other users right now, too. */
3481 mode = einfo->ei_mode;
3482 if (einfo->ei_mode == LCK_PR)
3484 mode = ldlm_lock_match(obd->obd_namespace,
3485 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3486 einfo->ei_type, &oinfo->oi_policy, mode,
3489 /* addref the lock only if not async requests and PW lock is
3490 * matched whereas we asked for PR. */
3491 if (!rqset && einfo->ei_mode != mode)
3492 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3493 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3496 /* I would like to be able to ASSERT here that rss <=
3497 * kms, but I can't, for reasons which are explained in
3501 /* We already have a lock, and it's referenced */
3502 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3504 /* For async requests, decref the lock. */
3505 if (einfo->ei_mode != mode)
3506 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3508 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3516 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3517 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
3518 [DLM_LOCKREQ_OFF + 1] = 0 };
3520 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3524 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3525 size[DLM_REPLY_REC_OFF] =
3526 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3527 ptlrpc_req_set_repsize(req, 3, size);
3530 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3531 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3533 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3534 &oinfo->oi_policy, &oinfo->oi_flags,
3535 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3536 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3537 lustre_swab_ost_lvb, oinfo->oi_lockh,
3541 struct osc_enqueue_args *aa;
3542 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3543 aa = ptlrpc_req_async_args(req);
3548 req->rq_interpret_reply = osc_enqueue_interpret;
3549 ptlrpc_set_add_req(rqset, req);
3550 } else if (intent) {
3551 ptlrpc_req_finished(req);
3556 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3558 ptlrpc_req_finished(req);
3563 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3564 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3565 int *flags, void *data, struct lustre_handle *lockh,
3568 struct ldlm_res_id res_id;
3569 struct obd_device *obd = exp->exp_obd;
3570 int lflags = *flags;
3574 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3576 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3578 /* Filesystem lock extents are extended to page boundaries so that
3579 * dealing with the page cache is a little smoother */
3580 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3581 policy->l_extent.end |= ~CFS_PAGE_MASK;
3583 /* Next, search for already existing extent locks that will cover us */
3584 /* If we're trying to read, we also search for an existing PW lock. The
3585 * VFS and page cache already protect us locally, so lots of readers/
3586 * writers can share a single PW lock. */
3590 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3591 &res_id, type, policy, rc, lockh);
3593 osc_set_data_with_check(lockh, data, lflags);
3594 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3595 ldlm_lock_addref(lockh, LCK_PR);
3596 ldlm_lock_decref(lockh, LCK_PW);
3598 if (n_matches != NULL)
3605 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3606 __u32 mode, struct lustre_handle *lockh, int flags,
3611 if (unlikely(mode == LCK_GROUP))
3612 ldlm_lock_decref_and_cancel(lockh, mode);
3614 ldlm_lock_decref(lockh, mode);
3619 static int osc_cancel_unused(struct obd_export *exp,
3620 struct lov_stripe_md *lsm, int flags, void *opaque)
3622 struct obd_device *obd = class_exp2obd(exp);
3623 struct ldlm_res_id res_id, *resp = NULL;
3626 resp = osc_build_res_name(lsm->lsm_object_id,
3627 lsm->lsm_object_gr, &res_id);
3630 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3634 static int osc_join_lru(struct obd_export *exp,
3635 struct lov_stripe_md *lsm, int join)
3637 struct obd_device *obd = class_exp2obd(exp);
3638 struct ldlm_res_id res_id, *resp = NULL;
3641 resp = osc_build_res_name(lsm->lsm_object_id,
3642 lsm->lsm_object_gr, &res_id);
3645 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3649 static int osc_statfs_interpret(struct ptlrpc_request *req,
3652 struct osc_async_args *aa = data;
3653 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3654 struct obd_statfs *msfs;
3659 /* The request has in fact never been sent
3660 * due to issues at a higher level (LOV).
3661 * Exit immediately since the caller is
3662 * aware of the problem and takes care
3663 * of the clean up */
3666 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3667 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3673 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3674 lustre_swab_obd_statfs);
3676 CERROR("Can't unpack obd_statfs\n");
3677 GOTO(out, rc = -EPROTO);
3680 /* Reinitialize the RDONLY and DEGRADED flags at the client
3681 * on each statfs, so they don't stay set permanently. */
3682 spin_lock(&cli->cl_oscc.oscc_lock);
3684 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3685 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3686 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3687 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3689 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3690 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3691 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3692 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3694 /* Add a bit of hysteresis so this flag isn't continually flapping,
3695 * and ensure that new files don't get extremely fragmented due to
3696 * only a small amount of available space in the filesystem.
3697 * We want to set the NOSPC flag when there is less than ~0.1% free
3698 * and clear it when there is at least ~0.2% free space, so:
3699 * avail < ~0.1% max max = avail + used
3700 * 1025 * avail < avail + used used = blocks - free
3701 * 1024 * avail < used
3702 * 1024 * avail < blocks - free
3703 * avail < ((blocks - free) >> 10)
3705 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3706 * lose that amount of space so in those cases we report no space left
3707 * if their is less than 1 GB left. */
3708 used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3709 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3710 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3711 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3712 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3713 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3714 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3716 spin_unlock(&cli->cl_oscc.oscc_lock);
3718 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3720 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3724 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3725 __u64 max_age, struct ptlrpc_request_set *rqset)
3727 struct ptlrpc_request *req;
3728 struct osc_async_args *aa;
3729 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3732 /* We could possibly pass max_age in the request (as an absolute
3733 * timestamp or a "seconds.usec ago") so the target can avoid doing
3734 * extra calls into the filesystem if that isn't necessary (e.g.
3735 * during mount that would help a bit). Having relative timestamps
3736 * is not so great if request processing is slow, while absolute
3737 * timestamps are not ideal because they need time synchronization. */
3738 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3739 OST_STATFS, 1, NULL, NULL);
3743 ptlrpc_req_set_repsize(req, 2, size);
3744 req->rq_request_portal = OST_CREATE_PORTAL;
3745 ptlrpc_at_set_req_timeout(req);
3746 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3747 /* procfs requests not want stat in wait for avoid deadlock */
3748 req->rq_no_resend = 1;
3749 req->rq_no_delay = 1;
3752 req->rq_interpret_reply = osc_statfs_interpret;
3753 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3754 aa = ptlrpc_req_async_args(req);
3757 ptlrpc_set_add_req(rqset, req);
3761 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3762 __u64 max_age, __u32 flags)
3764 struct obd_statfs *msfs;
3765 struct ptlrpc_request *req;
3766 struct obd_import *imp = NULL;
3767 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3771 /*Since the request might also come from lprocfs, so we need
3772 *sync this with client_disconnect_export Bug15684*/
3773 down_read(&obd->u.cli.cl_sem);
3774 if (obd->u.cli.cl_import)
3775 imp = class_import_get(obd->u.cli.cl_import);
3776 up_read(&obd->u.cli.cl_sem);
3780 /* We could possibly pass max_age in the request (as an absolute
3781 * timestamp or a "seconds.usec ago") so the target can avoid doing
3782 * extra calls into the filesystem if that isn't necessary (e.g.
3783 * during mount that would help a bit). Having relative timestamps
3784 * is not so great if request processing is slow, while absolute
3785 * timestamps are not ideal because they need time synchronization. */
3786 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3787 OST_STATFS, 1, NULL, NULL);
3789 class_import_put(imp);
3793 ptlrpc_req_set_repsize(req, 2, size);
3794 req->rq_request_portal = OST_CREATE_PORTAL;
3795 ptlrpc_at_set_req_timeout(req);
3797 if (flags & OBD_STATFS_NODELAY) {
3798 /* procfs requests not want stat in wait for avoid deadlock */
3799 req->rq_no_resend = 1;
3800 req->rq_no_delay = 1;
3803 rc = ptlrpc_queue_wait(req);
3807 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3808 lustre_swab_obd_statfs);
3810 CERROR("Can't unpack obd_statfs\n");
3811 GOTO(out, rc = -EPROTO);
3814 memcpy(osfs, msfs, sizeof(*osfs));
3818 ptlrpc_req_finished(req);
3822 /* Retrieve object striping information.
3824 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3825 * the maximum number of OST indices which will fit in the user buffer.
3826 * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3828 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3830 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3831 struct lov_user_md_v3 lum, *lumk;
3832 int rc = 0, lum_size;
3833 struct lov_user_ost_data_v1 *lmm_objects;
3839 /* we only need the header part from user space to get lmm_magic and
3840 * lmm_stripe_count, (the header part is common to v1 and v3) */
3841 lum_size = sizeof(struct lov_user_md_v1);
3842 memset(&lum, 0x00, sizeof(lum));
3843 if (copy_from_user(&lum, lump, lum_size))
3846 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3847 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3850 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3851 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3852 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3853 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3855 /* we can use lov_mds_md_size() to compute lum_size
3856 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3857 if (lum.lmm_stripe_count > 0) {
3858 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3859 OBD_ALLOC(lumk, lum_size);
3862 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3863 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3865 lmm_objects = &(lumk->lmm_objects[0]);
3866 lmm_objects->l_object_id = lsm->lsm_object_id;
3868 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3872 lumk->lmm_magic = lum.lmm_magic;
3873 lumk->lmm_stripe_count = 1;
3874 lumk->lmm_object_id = lsm->lsm_object_id;
3876 if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3877 (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3878 /* lsm not in host order, so count also need be in same order */
3879 __swab32s(&lumk->lmm_magic);
3880 __swab16s(&lumk->lmm_stripe_count);
3881 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3882 if (lum.lmm_stripe_count > 0)
3883 lustre_swab_lov_user_md_objects(
3884 (struct lov_user_md_v1*)lumk);
3887 if (copy_to_user(lump, lumk, lum_size))
3891 OBD_FREE(lumk, lum_size);
3897 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3898 void *karg, void *uarg)
3900 struct obd_device *obd = exp->exp_obd;
3901 struct obd_ioctl_data *data = karg;
3905 if (!try_module_get(THIS_MODULE)) {
3906 CERROR("Can't get module. Is it alive?");
3910 case OBD_IOC_LOV_GET_CONFIG: {
3912 struct lov_desc *desc;
3913 struct obd_uuid uuid;
3917 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3918 GOTO(out, err = -EINVAL);
3920 data = (struct obd_ioctl_data *)buf;
3922 if (sizeof(*desc) > data->ioc_inllen1) {
3923 obd_ioctl_freedata(buf, len);
3924 GOTO(out, err = -EINVAL);
3927 if (data->ioc_inllen2 < sizeof(uuid)) {
3928 obd_ioctl_freedata(buf, len);
3929 GOTO(out, err = -EINVAL);
3932 desc = (struct lov_desc *)data->ioc_inlbuf1;
3933 desc->ld_tgt_count = 1;
3934 desc->ld_active_tgt_count = 1;
3935 desc->ld_default_stripe_count = 1;
3936 desc->ld_default_stripe_size = 0;
3937 desc->ld_default_stripe_offset = 0;
3938 desc->ld_pattern = 0;
3939 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3941 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3943 err = copy_to_user((void *)uarg, buf, len);
3946 obd_ioctl_freedata(buf, len);
3949 case LL_IOC_LOV_SETSTRIPE:
3950 err = obd_alloc_memmd(exp, karg);
3954 case LL_IOC_LOV_GETSTRIPE:
3955 err = osc_getstripe(karg, uarg);
3957 case OBD_IOC_CLIENT_RECOVER:
3958 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3963 case IOC_OSC_SET_ACTIVE:
3964 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3967 case OBD_IOC_POLL_QUOTACHECK:
3968 err = lquota_poll_check(quota_interface, exp,
3969 (struct if_quotacheck *)karg);
3971 case OBD_IOC_DESTROY: {
3974 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3975 GOTO (out, err = -EPERM);
3976 oa = &data->ioc_obdo1;
3979 GOTO(out, err = -EINVAL);
3981 oa->o_valid |= OBD_MD_FLGROUP;
3983 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3986 case OBD_IOC_PING_TARGET:
3987 err = ptlrpc_obd_ping(obd);
3990 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3991 cmd, cfs_curproc_comm());
3992 GOTO(out, err = -ENOTTY);
3995 module_put(THIS_MODULE);
3999 static int osc_get_info(struct obd_export *exp, obd_count keylen,
4000 void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
4003 if (!vallen || !val)
4006 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
4007 __u32 *stripe = val;
4008 *vallen = sizeof(*stripe);
4011 } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
4012 struct client_obd *cli = &exp->exp_obd->u.cli;
4013 __u64 *rpcsize = val;
4014 LASSERT(*vallen == sizeof(__u64));
4015 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
4017 } else if (KEY_IS(KEY_LAST_ID)) {
4018 struct ptlrpc_request *req;
4020 char *bufs[2] = { NULL, key };
4021 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
4024 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
4025 OST_GET_INFO, 2, size, bufs);
4029 size[REPLY_REC_OFF] = *vallen;
4030 ptlrpc_req_set_repsize(req, 2, size);
4031 rc = ptlrpc_queue_wait(req);
4035 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
4036 lustre_swab_ost_last_id);
4037 if (reply == NULL) {
4038 CERROR("Can't unpack OST last ID\n");
4039 GOTO(out, rc = -EPROTO);
4041 *((obd_id *)val) = *reply;
4043 ptlrpc_req_finished(req);
4045 } else if (KEY_IS(KEY_FIEMAP)) {
4046 struct ptlrpc_request *req;
4047 struct ll_user_fiemap *reply;
4048 char *bufs[2] = { NULL, key };
4049 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
4052 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
4053 OST_GET_INFO, 2, size, bufs);
4057 size[REPLY_REC_OFF] = *vallen;
4058 ptlrpc_req_set_repsize(req, 2, size);
4060 rc = ptlrpc_queue_wait(req);
4063 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
4064 lustre_swab_fiemap);
4065 if (reply == NULL) {
4066 CERROR("Can't unpack FIEMAP reply.\n");
4067 GOTO(out1, rc = -EPROTO);
4070 memcpy(val, reply, *vallen);
4073 ptlrpc_req_finished(req);
4081 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
4084 struct llog_ctxt *ctxt;
4085 struct obd_import *imp = req->rq_import;
4091 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4094 rc = llog_initiator_connect(ctxt);
4096 CERROR("cannot establish connection for "
4097 "ctxt %p: %d\n", ctxt, rc);
4100 llog_ctxt_put(ctxt);
4101 spin_lock(&imp->imp_lock);
4102 imp->imp_server_timeout = 1;
4103 imp->imp_pingable = 1;
4104 spin_unlock(&imp->imp_lock);
4105 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4110 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4111 void *key, obd_count vallen, void *val,
4112 struct ptlrpc_request_set *set)
4114 struct ptlrpc_request *req;
4115 struct obd_device *obd = exp->exp_obd;
4116 struct obd_import *imp = class_exp2cliimp(exp);
4117 __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
4118 char *bufs[3] = { NULL, key, val };
4121 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4123 if (KEY_IS(KEY_NEXT_ID)) {
4125 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4127 if (vallen != sizeof(obd_id))
4130 /* avoid race between allocate new object and set next id
4131 * from ll_sync thread */
4132 spin_lock(&oscc->oscc_lock);
4133 new_val = *((obd_id*)val) + 1;
4134 if (new_val > oscc->oscc_next_id)
4135 oscc->oscc_next_id = new_val;
4136 spin_unlock(&oscc->oscc_lock);
4138 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4139 exp->exp_obd->obd_name,
4140 oscc->oscc_next_id);
4145 if (KEY_IS(KEY_INIT_RECOV)) {
4146 if (vallen != sizeof(int))
4148 spin_lock(&imp->imp_lock);
4149 imp->imp_initial_recov = *(int *)val;
4150 spin_unlock(&imp->imp_lock);
4151 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4152 exp->exp_obd->obd_name,
4153 imp->imp_initial_recov);
4157 if (KEY_IS(KEY_CHECKSUM)) {
4158 if (vallen != sizeof(int))
4160 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4164 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4167 /* We pass all other commands directly to OST. Since nobody calls osc
4168 methods directly and everybody is supposed to go through LOV, we
4169 assume lov checked invalid values for us.
4170 The only recognised values so far are evict_by_nid and mds_conn.
4171 Even if something bad goes through, we'd get a -EINVAL from OST
4174 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4179 if (KEY_IS(KEY_MDS_CONN))
4180 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4181 else if (KEY_IS(KEY_GRANT_SHRINK))
4182 req->rq_interpret_reply = osc_shrink_grant_interpret;
4184 if (KEY_IS(KEY_GRANT_SHRINK)) {
4185 struct osc_grant_args *aa;
4188 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4189 aa = ptlrpc_req_async_args(req);
4192 ptlrpc_req_finished(req);
4195 *oa = ((struct ost_body *)val)->oa;
4199 ptlrpc_req_set_repsize(req, 2, size);
4200 ptlrpcd_add_req(req);
4202 ptlrpc_req_set_repsize(req, 1, NULL);
4203 ptlrpc_set_add_req(set, req);
4204 ptlrpc_check_set(set);
4211 static struct llog_operations osc_size_repl_logops = {
4212 lop_cancel: llog_obd_repl_cancel
4215 static struct llog_operations osc_mds_ost_orig_logops;
4216 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4219 struct llog_catid catid;
4220 static char name[32] = CATLIST;
4226 mutex_down(&disk_obd->obd_llog_cat_process);
4228 rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4230 CERROR("rc: %d\n", rc);
4231 GOTO(out_unlock, rc);
4234 CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4235 obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4236 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4239 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4240 &catid.lci_logid, &osc_mds_ost_orig_logops);
4242 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4246 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4247 &osc_size_repl_logops);
4249 struct llog_ctxt *ctxt =
4250 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4253 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4257 CERROR("osc '%s' tgt '%s' rc=%d\n",
4258 obd->obd_name, disk_obd->obd_name, rc);
4259 CERROR("logid "LPX64":0x%x\n",
4260 catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4262 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4265 CERROR("rc: %d\n", rc);
4268 mutex_up(&disk_obd->obd_llog_cat_process);
4273 static int osc_llog_finish(struct obd_device *obd, int count)
4275 struct llog_ctxt *ctxt;
4276 int rc = 0, rc2 = 0;
4279 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4281 rc = llog_cleanup(ctxt);
4283 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4285 rc2 = llog_cleanup(ctxt);
4292 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4293 struct obd_uuid *cluuid,
4294 struct obd_connect_data *data,
4297 struct client_obd *cli = &obd->u.cli;
4299 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4302 client_obd_list_lock(&cli->cl_loi_list_lock);
4303 data->ocd_grant = cli->cl_avail_grant + cli->cl_dirty ?:
4304 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4305 lost_grant = cli->cl_lost_grant;
4306 cli->cl_lost_grant = 0;
4307 client_obd_list_unlock(&cli->cl_loi_list_lock);
4309 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4310 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4311 cli->cl_dirty, cli->cl_avail_grant, lost_grant);
4312 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4313 " ocd_grant: %d\n", data->ocd_connect_flags,
4314 data->ocd_version, data->ocd_grant);
4320 static int osc_disconnect(struct obd_export *exp)
4322 struct obd_device *obd = class_exp2obd(exp);
4323 struct llog_ctxt *ctxt;
4326 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4328 if (obd->u.cli.cl_conn_count == 1) {
4329 /* Flush any remaining cancel messages out to the
4331 llog_sync(ctxt, exp);
4333 llog_ctxt_put(ctxt);
4335 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4339 rc = client_disconnect_export(exp);
4341 * Initially we put del_shrink_grant before disconnect_export, but it
4342 * causes the following problem if setup (connect) and cleanup
4343 * (disconnect) are tangled together.
4344 * connect p1 disconnect p2
4345 * ptlrpc_connect_import
4346 * ............... class_manual_cleanup
4349 * ptlrpc_connect_interrupt
4351 * add this client to shrink list
4353 * Bang! pinger trigger the shrink.
4354 * So the osc should be disconnected from the shrink list, after we
4355 * are sure the import has been destroyed. BUG18662
4357 if (obd->u.cli.cl_import == NULL)
4358 osc_del_shrink_grant(&obd->u.cli);
4362 static int osc_import_event(struct obd_device *obd,
4363 struct obd_import *imp,
4364 enum obd_import_event event)
4366 struct client_obd *cli;
4370 LASSERT(imp->imp_obd == obd);
4373 case IMP_EVENT_DISCON: {
4374 /* Only do this on the MDS OSC's */
4375 if (imp->imp_server_timeout) {
4376 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4378 spin_lock(&oscc->oscc_lock);
4379 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4380 spin_unlock(&oscc->oscc_lock);
4383 client_obd_list_lock(&cli->cl_loi_list_lock);
4384 cli->cl_avail_grant = 0;
4385 cli->cl_lost_grant = 0;
4386 client_obd_list_unlock(&cli->cl_loi_list_lock);
4387 ptlrpc_import_setasync(imp, -1);
4391 case IMP_EVENT_INACTIVE: {
4392 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4395 case IMP_EVENT_INVALIDATE: {
4396 struct ldlm_namespace *ns = obd->obd_namespace;
4400 client_obd_list_lock(&cli->cl_loi_list_lock);
4401 /* all pages go to failing rpcs due to the invalid import */
4402 osc_check_rpcs(cli);
4403 client_obd_list_unlock(&cli->cl_loi_list_lock);
4405 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4409 case IMP_EVENT_ACTIVE: {
4410 /* Only do this on the MDS OSC's */
4411 if (imp->imp_server_timeout) {
4412 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4414 spin_lock(&oscc->oscc_lock);
4415 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4416 spin_unlock(&oscc->oscc_lock);
4418 CDEBUG(D_INFO, "notify server \n");
4419 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4422 case IMP_EVENT_OCD: {
4423 struct obd_connect_data *ocd = &imp->imp_connect_data;
4425 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4426 osc_init_grant(&obd->u.cli, ocd);
4429 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4430 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4432 ptlrpc_import_setasync(imp, 1);
4433 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4437 CERROR("Unknown import event %d\n", event);
4443 /* determine whether the lock can be canceled before replaying the lock
4444 * during recovery, see bug16774 for detailed information
4447 * zero - the lock can't be canceled
4448 * other - ok to cancel
4450 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4452 check_res_locked(lock->l_resource);
4453 if (lock->l_granted_mode == LCK_GROUP ||
4454 lock->l_resource->lr_type != LDLM_EXTENT)
4457 /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
4458 if (lock->l_granted_mode == LCK_PR ||
4459 lock->l_granted_mode == LCK_CR)
4465 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4471 rc = ptlrpcd_addref();
4475 rc = client_obd_setup(obd, len, buf);
4479 struct lprocfs_static_vars lvars = { 0 };
4480 struct client_obd *cli = &obd->u.cli;
4482 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4483 lprocfs_osc_init_vars(&lvars);
4484 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4485 lproc_osc_attach_seqstat(obd);
4486 ptlrpc_lprocfs_register_obd(obd);
4490 /* We need to allocate a few requests more, because
4491 brw_interpret tries to create new requests before freeing
4492 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4493 reserved, but I afraid that might be too much wasted RAM
4494 in fact, so 2 is just my guess and still should work. */
4495 cli->cl_import->imp_rq_pool =
4496 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4498 ptlrpc_add_rqs_to_pool);
4499 cli->cl_cache = cache_create(obd);
4500 if (!cli->cl_cache) {
4504 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4505 sema_init(&cli->cl_grant_sem, 1);
4507 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4513 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4519 case OBD_CLEANUP_EARLY: {
4520 struct obd_import *imp;
4521 imp = obd->u.cli.cl_import;
4522 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4523 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4524 ptlrpc_deactivate_import(imp);
4527 case OBD_CLEANUP_EXPORTS: {
4528 /* If we set up but never connected, the
4529 client import will not have been cleaned. */
4530 down_write(&obd->u.cli.cl_sem);
4531 if (obd->u.cli.cl_import) {
4532 struct obd_import *imp;
4533 imp = obd->u.cli.cl_import;
4534 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4536 ptlrpc_invalidate_import(imp);
4537 if (imp->imp_rq_pool) {
4538 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4539 imp->imp_rq_pool = NULL;
4541 class_destroy_import(imp);
4542 obd->u.cli.cl_import = NULL;
4544 up_write(&obd->u.cli.cl_sem);
4546 rc = obd_llog_finish(obd, 0);
4548 CERROR("failed to cleanup llogging subsystems\n");
4551 case OBD_CLEANUP_SELF_EXP:
4553 case OBD_CLEANUP_OBD:
4559 int osc_cleanup(struct obd_device *obd)
4564 ptlrpc_lprocfs_unregister_obd(obd);
4565 lprocfs_obd_cleanup(obd);
4567 /* free memory of osc quota cache */
4568 lquota_cleanup(quota_interface, obd);
4570 cache_destroy(obd->u.cli.cl_cache);
4571 rc = client_obd_cleanup(obd);
4577 static int osc_register_page_removal_cb(struct obd_device *obd,
4578 obd_page_removal_cb_t func,
4579 obd_pin_extent_cb pin_cb)
4583 /* this server - not need init */
4587 return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4591 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4592 obd_page_removal_cb_t func)
4595 return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4598 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4599 obd_lock_cancel_cb cb)
4602 LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4604 /* this server - not need init */
4608 obd->u.cli.cl_ext_lock_cancel_cb = cb;
4612 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4613 obd_lock_cancel_cb cb)
4617 if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4618 CERROR("Unregistering cancel cb %p, while only %p was "
4620 obd->u.cli.cl_ext_lock_cancel_cb);
4624 obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4628 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4630 struct lustre_cfg *lcfg = buf;
4631 struct lprocfs_static_vars lvars = { 0 };
4634 lprocfs_osc_init_vars(&lvars);
4636 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4640 struct obd_ops osc_obd_ops = {
4641 .o_owner = THIS_MODULE,
4642 .o_setup = osc_setup,
4643 .o_precleanup = osc_precleanup,
4644 .o_cleanup = osc_cleanup,
4645 .o_add_conn = client_import_add_conn,
4646 .o_del_conn = client_import_del_conn,
4647 .o_connect = client_connect_import,
4648 .o_reconnect = osc_reconnect,
4649 .o_disconnect = osc_disconnect,
4650 .o_statfs = osc_statfs,
4651 .o_statfs_async = osc_statfs_async,
4652 .o_packmd = osc_packmd,
4653 .o_unpackmd = osc_unpackmd,
4654 .o_precreate = osc_precreate,
4655 .o_create = osc_create,
4656 .o_create_async = osc_create_async,
4657 .o_destroy = osc_destroy,
4658 .o_getattr = osc_getattr,
4659 .o_getattr_async = osc_getattr_async,
4660 .o_setattr = osc_setattr,
4661 .o_setattr_async = osc_setattr_async,
4663 .o_brw_async = osc_brw_async,
4664 .o_prep_async_page = osc_prep_async_page,
4665 .o_get_lock = osc_get_lock,
4666 .o_queue_async_io = osc_queue_async_io,
4667 .o_set_async_flags = osc_set_async_flags,
4668 .o_queue_group_io = osc_queue_group_io,
4669 .o_trigger_group_io = osc_trigger_group_io,
4670 .o_teardown_async_page = osc_teardown_async_page,
4671 .o_punch = osc_punch,
4673 .o_enqueue = osc_enqueue,
4674 .o_match = osc_match,
4675 .o_change_cbdata = osc_change_cbdata,
4676 .o_find_cbdata = osc_find_cbdata,
4677 .o_cancel = osc_cancel,
4678 .o_cancel_unused = osc_cancel_unused,
4679 .o_join_lru = osc_join_lru,
4680 .o_iocontrol = osc_iocontrol,
4681 .o_get_info = osc_get_info,
4682 .o_set_info_async = osc_set_info_async,
4683 .o_import_event = osc_import_event,
4684 .o_llog_init = osc_llog_init,
4685 .o_llog_finish = osc_llog_finish,
4686 .o_process_config = osc_process_config,
4687 .o_register_page_removal_cb = osc_register_page_removal_cb,
4688 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4689 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4690 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4692 int __init osc_init(void)
4694 struct lprocfs_static_vars lvars = { 0 };
4698 lprocfs_osc_init_vars(&lvars);
4700 request_module("lquota");
4701 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4702 lquota_init(quota_interface);
4703 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4705 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4708 if (quota_interface)
4709 PORTAL_SYMBOL_PUT(osc_quota_interface);
4713 osc_mds_ost_orig_logops = llog_lvfs_ops;
4714 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4715 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4716 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4717 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4723 static void /*__exit*/ osc_exit(void)
4725 lquota_exit(quota_interface);
4726 if (quota_interface)
4727 PORTAL_SYMBOL_PUT(osc_quota_interface);
4729 class_unregister_type(LUSTRE_OSC_NAME);
4732 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4733 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4734 MODULE_LICENSE("GPL");
4736 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);