1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
78 atomic_t osc_resend_time;
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82 struct lov_stripe_md *lsm)
87 lmm_size = sizeof(**lmmp);
92 OBD_FREE(*lmmp, lmm_size);
98 OBD_ALLOC(*lmmp, lmm_size);
104 LASSERT(lsm->lsm_object_id);
105 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113 struct lov_mds_md *lmm, int lmm_bytes)
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == 0) {
127 CERROR("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
136 if (*lsmp != NULL && lmm == NULL) {
137 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 OBD_FREE(*lsmp, lsm_size);
144 OBD_ALLOC(*lsmp, lsm_size);
147 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149 OBD_FREE(*lsmp, lsm_size);
152 loi_init((*lsmp)->lsm_oinfo[0]);
156 /* XXX zero *lsmp? */
157 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158 LASSERT((*lsmp)->lsm_object_id);
161 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
169 struct ost_body *body;
170 struct osc_async_args *aa = data;
176 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
177 lustre_swab_ost_body);
179 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
182 /* This should really be sent by the OST */
183 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
184 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
186 CERROR("can't unpack ost_body\n");
188 aa->aa_oi->oi_oa->o_valid = 0;
191 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
195 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
196 struct ptlrpc_request_set *set)
198 struct ptlrpc_request *req;
199 struct ost_body *body;
200 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
201 struct osc_async_args *aa;
204 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
205 OST_GETATTR, 2, size,NULL);
209 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
210 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
212 ptlrpc_req_set_repsize(req, 2, size);
213 req->rq_interpret_reply = osc_getattr_interpret;
215 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
216 aa = ptlrpc_req_async_args(req);
219 ptlrpc_set_add_req(set, req);
223 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
225 struct ptlrpc_request *req;
226 struct ost_body *body;
227 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
231 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
232 OST_GETATTR, 2, size, NULL);
236 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
237 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
239 ptlrpc_req_set_repsize(req, 2, size);
241 rc = ptlrpc_queue_wait(req);
243 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
247 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
248 lustre_swab_ost_body);
250 CERROR ("can't unpack ost_body\n");
251 GOTO (out, rc = -EPROTO);
254 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
257 /* This should really be sent by the OST */
258 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
259 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
263 ptlrpc_req_finished(req);
267 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
268 struct obd_trans_info *oti)
270 struct ptlrpc_request *req;
271 struct ost_body *body;
272 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
276 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
277 OST_SETATTR, 2, size, NULL);
281 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
282 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
284 ptlrpc_req_set_repsize(req, 2, size);
286 rc = ptlrpc_queue_wait(req);
290 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
291 lustre_swab_ost_body);
293 GOTO(out, rc = -EPROTO);
295 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
299 ptlrpc_req_finished(req);
303 static int osc_setattr_interpret(struct ptlrpc_request *req,
306 struct ost_body *body;
307 struct osc_async_args *aa = data;
313 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
314 lustre_swab_ost_body);
316 CERROR("can't unpack ost_body\n");
317 GOTO(out, rc = -EPROTO);
320 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
322 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
326 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
327 struct obd_trans_info *oti,
328 struct ptlrpc_request_set *rqset)
330 struct ptlrpc_request *req;
331 struct ost_body *body;
332 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
334 struct osc_async_args *aa;
337 if (osc_exp_is_2_0_server(exp)) {
341 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
342 OST_SETATTR, bufcount, size, NULL);
346 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
348 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
350 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
353 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
354 ptlrpc_req_set_repsize(req, 2, size);
355 /* do mds to ost setattr asynchronouly */
357 /* Do not wait for response. */
358 ptlrpcd_add_req(req);
360 req->rq_interpret_reply = osc_setattr_interpret;
362 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
363 aa = ptlrpc_req_async_args(req);
366 ptlrpc_set_add_req(rqset, req);
372 int osc_real_create(struct obd_export *exp, struct obdo *oa,
373 struct lov_stripe_md **ea, struct obd_trans_info *oti)
375 struct ptlrpc_request *req;
376 struct ost_body *body;
377 struct lov_stripe_md *lsm;
378 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
387 rc = obd_alloc_memmd(exp, &lsm);
392 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
393 OST_CREATE, 2, size, NULL);
395 GOTO(out, rc = -ENOMEM);
397 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
398 lustre_set_wire_obdo(&body->oa, oa);
400 ptlrpc_req_set_repsize(req, 2, size);
401 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
402 oa->o_flags == OBD_FL_DELORPHAN) {
404 "delorphan from OST integration");
405 /* Don't resend the delorphan req */
406 req->rq_no_resend = req->rq_no_delay = 1;
409 rc = ptlrpc_queue_wait(req);
413 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
414 lustre_swab_ost_body);
416 CERROR ("can't unpack ost_body\n");
417 GOTO (out_req, rc = -EPROTO);
420 lustre_get_wire_obdo(oa, &body->oa);
422 /* This should really be sent by the OST */
423 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
424 oa->o_valid |= OBD_MD_FLBLKSZ;
426 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
427 * have valid lsm_oinfo data structs, so don't go touching that.
428 * This needs to be fixed in a big way.
430 lsm->lsm_object_id = oa->o_id;
434 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
436 if (oa->o_valid & OBD_MD_FLCOOKIE) {
437 if (!oti->oti_logcookies)
438 oti_alloc_cookies(oti, 1);
439 *oti->oti_logcookies = oa->o_lcookie;
443 CDEBUG(D_HA, "transno: "LPD64"\n",
444 lustre_msg_get_transno(req->rq_repmsg));
446 ptlrpc_req_finished(req);
449 obd_free_memmd(exp, &lsm);
453 static int osc_punch_interpret(struct ptlrpc_request *req,
456 struct ost_body *body;
457 struct osc_async_args *aa = data;
463 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
464 lustre_swab_ost_body);
466 CERROR ("can't unpack ost_body\n");
467 GOTO(out, rc = -EPROTO);
470 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
472 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
476 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
477 struct obd_trans_info *oti,
478 struct ptlrpc_request_set *rqset)
480 struct ptlrpc_request *req;
481 struct osc_async_args *aa;
482 struct ost_body *body;
483 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
492 OST_PUNCH, 2, size, NULL);
496 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
497 ptlrpc_at_set_req_timeout(req);
499 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
500 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
502 /* overload the size and blocks fields in the oa with start/end */
503 body->oa.o_size = oinfo->oi_policy.l_extent.start;
504 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
505 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
507 ptlrpc_req_set_repsize(req, 2, size);
509 req->rq_interpret_reply = osc_punch_interpret;
510 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
511 aa = ptlrpc_req_async_args(req);
513 ptlrpc_set_add_req(rqset, req);
518 static int osc_sync_interpret(struct ptlrpc_request *req,
521 struct ost_body *body;
522 struct osc_async_args *aa = data;
528 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529 lustre_swab_ost_body);
531 CERROR ("can't unpack ost_body\n");
532 GOTO(out, rc = -EPROTO);
535 *aa->aa_oi->oi_oa = body->oa;
537 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
541 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
542 obd_size start, obd_size end,
543 struct ptlrpc_request_set *set)
545 struct ptlrpc_request *req;
546 struct ost_body *body;
547 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
548 struct osc_async_args *aa;
556 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
557 OST_SYNC, 2, size, NULL);
561 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
562 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
564 /* overload the size and blocks fields in the oa with start/end */
565 body->oa.o_size = start;
566 body->oa.o_blocks = end;
567 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
569 ptlrpc_req_set_repsize(req, 2, size);
570 req->rq_interpret_reply = osc_sync_interpret;
572 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
573 aa = ptlrpc_req_async_args(req);
576 ptlrpc_set_add_req(set, req);
580 /* Find and cancel locally locks matched by @mode in the resource found by
581 * @objid. Found locks are added into @cancel list. Returns the amount of
582 * locks added to @cancels list. */
583 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
584 struct list_head *cancels, ldlm_mode_t mode,
587 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
588 struct ldlm_res_id res_id;
589 struct ldlm_resource *res;
593 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
594 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
598 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
599 lock_flags, 0, NULL);
600 ldlm_resource_putref(res);
604 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
607 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
609 atomic_dec(&cli->cl_destroy_in_flight);
610 cfs_waitq_signal(&cli->cl_destroy_waitq);
614 static int osc_can_send_destroy(struct client_obd *cli)
616 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617 cli->cl_max_rpcs_in_flight) {
618 /* The destroy request can be sent */
621 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622 cli->cl_max_rpcs_in_flight) {
624 * The counter has been modified between the two atomic
627 cfs_waitq_signal(&cli->cl_destroy_waitq);
632 /* Destroy requests can be async always on the client, and we don't even really
633 * care about the return code since the client cannot do anything at all about
635 * When the MDS is unlinking a filename, it saves the file objects into a
636 * recovery llog, and these object records are cancelled when the OST reports
637 * they were destroyed and sync'd to disk (i.e. transaction committed).
638 * If the client dies, or the OST is down when the object should be destroyed,
639 * the records are not cancelled, and when the OST reconnects to the MDS next,
640 * it will retrieve the llog unlink logs and then sends the log cancellation
641 * cookies to the MDS after committing destroy transactions. */
642 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
643 struct lov_stripe_md *ea, struct obd_trans_info *oti,
644 struct obd_export *md_export)
646 CFS_LIST_HEAD(cancels);
647 struct ptlrpc_request *req;
648 struct ost_body *body;
649 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
650 sizeof(struct ldlm_request) };
651 int count, bufcount = 2;
652 struct client_obd *cli = &exp->exp_obd->u.cli;
660 LASSERT(oa->o_id != 0);
662 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
663 LDLM_FL_DISCARD_DATA);
664 if (exp_connect_cancelset(exp))
666 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
667 size, REQ_REC_OFF + 1, 0, &cancels, count);
671 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
672 ptlrpc_at_set_req_timeout(req);
674 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
676 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
677 oa->o_lcookie = *oti->oti_logcookies;
680 lustre_set_wire_obdo(&body->oa, oa);
681 ptlrpc_req_set_repsize(req, 2, size);
683 /* don't throttle destroy RPCs for the MDT */
684 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
685 req->rq_interpret_reply = osc_destroy_interpret;
686 if (!osc_can_send_destroy(cli)) {
687 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
691 * Wait until the number of on-going destroy RPCs drops
692 * under max_rpc_in_flight
694 l_wait_event_exclusive(cli->cl_destroy_waitq,
695 osc_can_send_destroy(cli), &lwi);
699 /* Do not wait for response */
700 ptlrpcd_add_req(req);
704 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
707 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
709 LASSERT(!(oa->o_valid & bits));
712 client_obd_list_lock(&cli->cl_loi_list_lock);
713 oa->o_dirty = cli->cl_dirty;
714 if (cli->cl_dirty > cli->cl_dirty_max) {
715 CERROR("dirty %lu > dirty_max %lu\n",
716 cli->cl_dirty, cli->cl_dirty_max);
718 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages + 1) {
719 /* The atomic_read() allowing the atomic_inc() are not covered
720 * by a lock thus they may safely race and trip this CERROR()
721 * unless we add in a small fudge factor (+1). */
722 CERROR("dirty %d > system dirty_max %d\n",
723 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
725 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
726 CERROR("dirty %lu - dirty_max %lu too big???\n",
727 cli->cl_dirty, cli->cl_dirty_max);
730 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
731 (cli->cl_max_rpcs_in_flight + 1);
732 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
734 oa->o_grant = cli->cl_avail_grant;
735 oa->o_dropped = cli->cl_lost_grant;
736 cli->cl_lost_grant = 0;
737 client_obd_list_unlock(&cli->cl_loi_list_lock);
738 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
739 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
743 static void osc_update_next_shrink(struct client_obd *cli)
745 cli->cl_next_shrink_grant =
746 cfs_time_shift(cli->cl_grant_shrink_interval);
747 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
748 cli->cl_next_shrink_grant);
751 /* caller must hold loi_list_lock */
752 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
754 atomic_inc(&obd_dirty_pages);
755 cli->cl_dirty += CFS_PAGE_SIZE;
756 cli->cl_avail_grant -= CFS_PAGE_SIZE;
757 pga->flag |= OBD_BRW_FROM_GRANT;
758 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
759 CFS_PAGE_SIZE, pga, pga->pg);
760 LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
761 cli->cl_avail_grant);
762 osc_update_next_shrink(cli);
765 /* the companion to osc_consume_write_grant, called when a brw has completed.
766 * must be called with the loi lock held. */
767 static void osc_release_write_grant(struct client_obd *cli,
768 struct brw_page *pga, int sent)
770 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
773 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
778 pga->flag &= ~OBD_BRW_FROM_GRANT;
779 atomic_dec(&obd_dirty_pages);
780 cli->cl_dirty -= CFS_PAGE_SIZE;
782 cli->cl_lost_grant += CFS_PAGE_SIZE;
783 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
784 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
785 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
786 /* For short writes we shouldn't count parts of pages that
787 * span a whole block on the OST side, or our accounting goes
788 * wrong. Should match the code in filter_grant_check. */
789 int offset = pga->off & ~CFS_PAGE_MASK;
790 int count = pga->count + (offset & (blocksize - 1));
791 int end = (offset + pga->count) & (blocksize - 1);
793 count += blocksize - end;
795 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
796 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
797 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
798 cli->cl_avail_grant, cli->cl_dirty);
804 static unsigned long rpcs_in_flight(struct client_obd *cli)
806 return cli->cl_r_in_flight + cli->cl_w_in_flight;
809 /* caller must hold loi_list_lock */
810 void osc_wake_cache_waiters(struct client_obd *cli)
812 struct list_head *l, *tmp;
813 struct osc_cache_waiter *ocw;
816 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
817 /* if we can't dirty more, we must wait until some is written */
818 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
819 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
820 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
821 "osc max %ld, sys max %d\n", cli->cl_dirty,
822 cli->cl_dirty_max, obd_max_dirty_pages);
826 /* if still dirty cache but no grant wait for pending RPCs that
827 * may yet return us some grant before doing sync writes */
828 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
829 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
830 cli->cl_w_in_flight);
834 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
835 list_del_init(&ocw->ocw_entry);
836 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
837 /* no more RPCs in flight to return grant, do sync IO */
838 ocw->ocw_rc = -EDQUOT;
839 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
841 osc_consume_write_grant(cli,
842 &ocw->ocw_oap->oap_brw_page);
845 cfs_waitq_signal(&ocw->ocw_waitq);
851 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
853 client_obd_list_lock(&cli->cl_loi_list_lock);
854 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
855 if (body->oa.o_valid & OBD_MD_FLGRANT)
856 cli->cl_avail_grant += body->oa.o_grant;
857 /* waiters are woken in brw_interpret */
858 client_obd_list_unlock(&cli->cl_loi_list_lock);
861 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
862 void *key, obd_count vallen, void *val,
863 struct ptlrpc_request_set *set);
865 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
868 struct osc_grant_args *aa = data;
869 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870 struct obdo *oa = aa->aa_oa;
871 struct ost_body *body;
874 client_obd_list_lock(&cli->cl_loi_list_lock);
875 cli->cl_avail_grant += oa->o_grant;
876 client_obd_list_unlock(&cli->cl_loi_list_lock);
879 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
880 lustre_swab_ost_body);
881 osc_update_grant(cli, body);
887 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
889 client_obd_list_lock(&cli->cl_loi_list_lock);
890 oa->o_grant = cli->cl_avail_grant / 4;
891 cli->cl_avail_grant -= oa->o_grant;
892 client_obd_list_unlock(&cli->cl_loi_list_lock);
893 oa->o_flags |= OBD_FL_SHRINK_GRANT;
894 osc_update_next_shrink(cli);
897 /* Shrink the current grant, either from some large amount to enough for a
898 * full set of in-flight RPCs, or if we have already shrunk to that limit
899 * then to enough for a single RPC. This avoids keeping more grant than
900 * needed, and avoids shrinking the grant piecemeal. */
901 static int osc_shrink_grant(struct client_obd *cli)
903 long target = (cli->cl_max_rpcs_in_flight + 1) *
904 cli->cl_max_pages_per_rpc;
906 client_obd_list_lock(&cli->cl_loi_list_lock);
907 if (cli->cl_avail_grant <= target)
908 target = cli->cl_max_pages_per_rpc;
909 client_obd_list_unlock(&cli->cl_loi_list_lock);
911 return osc_shrink_grant_to_target(cli, target);
914 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
917 struct ost_body *body;
920 client_obd_list_lock(&cli->cl_loi_list_lock);
921 /* Don't shrink if we are already above or below the desired limit
922 * We don't want to shrink below a single RPC, as that will negatively
923 * impact block allocation and long-term performance. */
924 if (target < cli->cl_max_pages_per_rpc)
925 target = cli->cl_max_pages_per_rpc;
927 if (target >= cli->cl_avail_grant) {
928 client_obd_list_unlock(&cli->cl_loi_list_lock);
931 client_obd_list_unlock(&cli->cl_loi_list_lock);
937 osc_announce_cached(cli, &body->oa, 0);
939 client_obd_list_lock(&cli->cl_loi_list_lock);
940 body->oa.o_grant = cli->cl_avail_grant - target;
941 cli->cl_avail_grant = target;
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
943 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
944 osc_update_next_shrink(cli);
946 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
947 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
948 sizeof(*body), body, NULL);
950 client_obd_list_lock(&cli->cl_loi_list_lock);
951 cli->cl_avail_grant += body->oa.o_grant;
952 client_obd_list_unlock(&cli->cl_loi_list_lock);
958 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
959 static int osc_should_shrink_grant(struct client_obd *client)
961 cfs_time_t time = cfs_time_current();
962 cfs_time_t next_shrink = client->cl_next_shrink_grant;
963 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
964 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
965 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
968 osc_update_next_shrink(client);
973 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
975 struct client_obd *client;
977 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
978 if (osc_should_shrink_grant(client))
979 osc_shrink_grant(client);
984 static int osc_add_shrink_grant(struct client_obd *client)
988 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
990 osc_grant_shrink_grant_cb, NULL,
991 &client->cl_grant_shrink_list);
993 CERROR("add grant client %s error %d\n",
994 client->cl_import->imp_obd->obd_name, rc);
997 CDEBUG(D_CACHE, "add grant client %s \n",
998 client->cl_import->imp_obd->obd_name);
999 osc_update_next_shrink(client);
1003 static int osc_del_shrink_grant(struct client_obd *client)
1005 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1009 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1011 client_obd_list_lock(&cli->cl_loi_list_lock);
1012 cli->cl_avail_grant = ocd->ocd_grant;
1013 client_obd_list_unlock(&cli->cl_loi_list_lock);
1015 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1016 list_empty(&cli->cl_grant_shrink_list))
1017 osc_add_shrink_grant(cli);
1019 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1020 cli->cl_avail_grant, cli->cl_lost_grant);
1021 LASSERT(cli->cl_avail_grant >= 0);
1024 /* We assume that the reason this OSC got a short read is because it read
1025 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1026 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1027 * this stripe never got written at or beyond this stripe offset yet. */
1028 static void handle_short_read(int nob_read, obd_count page_count,
1029 struct brw_page **pga, int pshift)
1034 /* skip bytes read OK */
1035 while (nob_read > 0) {
1036 LASSERT (page_count > 0);
1038 if (pga[i]->count > nob_read) {
1039 /* EOF inside this page */
1040 ptr = cfs_kmap(pga[i]->pg) +
1041 (OSC_FILE2MEM_OFF(pga[i]->off,pshift)&~CFS_PAGE_MASK);
1042 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1043 cfs_kunmap(pga[i]->pg);
1049 nob_read -= pga[i]->count;
1054 /* zero remaining pages */
1055 while (page_count-- > 0) {
1056 ptr = cfs_kmap(pga[i]->pg) +
1057 (OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK);
1058 memset(ptr, 0, pga[i]->count);
1059 cfs_kunmap(pga[i]->pg);
1064 static int check_write_rcs(struct ptlrpc_request *req,
1065 int requested_nob, int niocount,
1066 obd_count page_count, struct brw_page **pga)
1070 /* return error if any niobuf was in error */
1071 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1072 sizeof(*remote_rcs) * niocount, NULL);
1073 if (remote_rcs == NULL) {
1074 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1077 if (lustre_rep_need_swab(req))
1078 for (i = 0; i < niocount; i++)
1079 __swab32s(&remote_rcs[i]);
1081 for (i = 0; i < niocount; i++) {
1082 if (remote_rcs[i] < 0)
1083 return(remote_rcs[i]);
1085 if (remote_rcs[i] != 0) {
1086 CERROR("rc[%d] invalid (%d) req %p\n",
1087 i, remote_rcs[i], req);
1092 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1093 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1094 req->rq_bulk->bd_nob_transferred, requested_nob);
1101 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1103 if (p1->flag != p2->flag) {
1104 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1106 /* warn if we try to combine flags that we don't know to be
1107 * safe to combine */
1108 if ((p1->flag & mask) != (p2->flag & mask))
1109 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1110 "same brw?\n", p1->flag, p2->flag);
1114 return (p1->off + p1->count == p2->off);
1117 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1118 struct brw_page **pga, int opc,
1119 cksum_type_t cksum_type, int pshift)
1124 LASSERT (pg_count > 0);
1125 cksum = init_checksum(cksum_type);
1126 while (nob > 0 && pg_count > 0) {
1127 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1128 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1129 int count = pga[i]->count > nob ? nob : pga[i]->count;
1131 /* corrupt the data before we compute the checksum, to
1132 * simulate an OST->client data error */
1133 if (i == 0 && opc == OST_READ &&
1134 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1135 memcpy(ptr + off, "bad1", min(4, nob));
1136 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1137 cfs_kunmap(pga[i]->pg);
1138 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1141 nob -= pga[i]->count;
1145 /* For sending we only compute the wrong checksum instead
1146 * of corrupting the data so it is still correct on a redo */
1147 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1153 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1154 struct lov_stripe_md *lsm, obd_count page_count,
1155 struct brw_page **pga,
1156 struct ptlrpc_request **reqp, int pshift)
1158 struct ptlrpc_request *req;
1159 struct ptlrpc_bulk_desc *desc;
1160 struct ost_body *body;
1161 struct obd_ioobj *ioobj;
1162 struct niobuf_remote *niobuf;
1163 __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1164 int niocount, i, requested_nob, opc, rc;
1165 struct ptlrpc_request_pool *pool;
1166 struct osc_brw_async_args *aa;
1167 struct brw_page *pg_prev;
1170 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1171 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1173 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1174 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1176 for (niocount = i = 1; i < page_count; i++) {
1177 if (!can_merge_pages(pga[i - 1], pga[i]))
1181 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1182 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1184 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1189 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1190 ptlrpc_at_set_req_timeout(req);
1192 if (opc == OST_WRITE)
1193 desc = ptlrpc_prep_bulk_imp (req, page_count,
1194 BULK_GET_SOURCE, OST_BULK_PORTAL);
1196 desc = ptlrpc_prep_bulk_imp (req, page_count,
1197 BULK_PUT_SINK, OST_BULK_PORTAL);
1199 GOTO(out, rc = -ENOMEM);
1200 /* NB request now owns desc and will free it when it gets freed */
1202 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1203 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1204 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1205 niocount * sizeof(*niobuf));
1207 lustre_set_wire_obdo(&body->oa, oa);
1208 obdo_to_ioobj(oa, ioobj);
1209 ioobj->ioo_bufcnt = niocount;
1211 LASSERT (page_count > 0);
1213 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1214 struct brw_page *pg = pga[i];
1216 LASSERT(pg->count > 0);
1217 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1218 pg->count <= CFS_PAGE_SIZE,
1219 "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1220 i, pg, pg->off, pg->count, pshift);
1222 LASSERTF(i == 0 || pg->off > pg_prev->off,
1223 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1224 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1226 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1227 pg_prev->pg, page_private(pg_prev->pg),
1228 pg_prev->pg->index, pg_prev->off);
1230 LASSERTF(i == 0 || pg->off > pg_prev->off,
1231 "i %d p_c %u\n", i, page_count);
1233 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1234 (pg->flag & OBD_BRW_SRVLOCK));
1236 ptlrpc_prep_bulk_page(desc, pg->pg,
1237 OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1239 requested_nob += pg->count;
1241 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1243 niobuf->len += pg->count;
1245 niobuf->offset = pg->off;
1246 niobuf->len = pg->count;
1247 niobuf->flags = pg->flag;
1252 LASSERTF((void *)(niobuf - niocount) ==
1253 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1254 niocount * sizeof(*niobuf)),
1255 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1256 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1257 (void *)(niobuf - niocount));
1259 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1260 if (osc_should_shrink_grant(cli))
1261 osc_shrink_grant_local(cli, &body->oa);
1263 /* size[REQ_REC_OFF] still sizeof (*body) */
1264 if (opc == OST_WRITE) {
1265 if (cli->cl_checksum) {
1266 /* store cl_cksum_type in a local variable since
1267 * it can be changed via lprocfs */
1268 cksum_type_t cksum_type = cli->cl_cksum_type;
1270 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1271 oa->o_flags &= OBD_FL_LOCAL_MASK;
1272 body->oa.o_flags = 0;
1274 body->oa.o_flags |= cksum_type_pack(cksum_type);
1275 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1279 cksum_type, pshift);
1280 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1282 /* save this in 'oa', too, for later checking */
1283 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1284 oa->o_flags |= cksum_type_pack(cksum_type);
1286 /* clear out the checksum flag, in case this is a
1287 * resend but cl_checksum is no longer set. b=11238 */
1288 oa->o_valid &= ~OBD_MD_FLCKSUM;
1290 oa->o_cksum = body->oa.o_cksum;
1291 /* 1 RC per niobuf */
1292 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1293 ptlrpc_req_set_repsize(req, 3, size);
1295 if (cli->cl_checksum) {
1296 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1297 body->oa.o_flags = 0;
1298 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1299 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1301 /* 1 RC for the whole I/O */
1302 ptlrpc_req_set_repsize(req, 2, size);
1305 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1306 aa = ptlrpc_req_async_args(req);
1308 aa->aa_requested_nob = requested_nob;
1309 aa->aa_nio_count = niocount;
1310 aa->aa_page_count = page_count;
1314 aa->aa_pshift = pshift;
1315 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1321 ptlrpc_req_finished (req);
1325 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1326 __u32 client_cksum, __u32 server_cksum, int nob,
1327 obd_count page_count, struct brw_page **pga,
1328 cksum_type_t client_cksum_type, int pshift)
1332 cksum_type_t cksum_type;
1334 if (server_cksum == client_cksum) {
1335 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1339 if (oa->o_valid & OBD_MD_FLFLAGS)
1340 cksum_type = cksum_type_unpack(oa->o_flags);
1342 cksum_type = OBD_CKSUM_CRC32;
1344 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1345 cksum_type, pshift);
1347 if (cksum_type != client_cksum_type)
1348 msg = "the server did not use the checksum type specified in "
1349 "the original request - likely a protocol problem";
1350 else if (new_cksum == server_cksum)
1351 msg = "changed on the client after we checksummed it - "
1352 "likely false positive due to mmap IO (bug 11742)";
1353 else if (new_cksum == client_cksum)
1354 msg = "changed in transit before arrival at OST";
1356 msg = "changed in transit AND doesn't match the original - "
1357 "likely false positive due to mmap IO (bug 11742)";
1359 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1360 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1361 "["LPU64"-"LPU64"]\n",
1362 msg, libcfs_nid2str(peer->nid),
1363 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1364 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1367 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1369 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1370 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1371 "client csum now %x\n", client_cksum, client_cksum_type,
1372 server_cksum, cksum_type, new_cksum);
1377 /* Note rc enters this function as number of bytes transferred */
1378 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1380 struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1381 const lnet_process_id_t *peer =
1382 &req->rq_import->imp_connection->c_peer;
1383 struct client_obd *cli = aa->aa_cli;
1384 struct ost_body *body;
1385 __u32 client_cksum = 0;
1388 if (rc < 0 && rc != -EDQUOT)
1391 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1392 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1393 lustre_swab_ost_body);
1395 CERROR ("Can't unpack body\n");
1399 /* set/clear over quota flag for a uid/gid */
1400 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1401 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1402 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1403 body->oa.o_gid, body->oa.o_valid,
1409 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1410 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1412 osc_update_grant(cli, body);
1414 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1416 CERROR ("Unexpected +ve rc %d\n", rc);
1419 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1421 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1422 check_write_checksum(&body->oa, peer, client_cksum,
1423 body->oa.o_cksum, aa->aa_requested_nob,
1424 aa->aa_page_count, aa->aa_ppga,
1425 cksum_type_unpack(aa->aa_oa->o_flags),
1429 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1430 aa->aa_page_count, aa->aa_ppga);
1434 /* The rest of this function executes only for OST_READs */
1435 if (rc > aa->aa_requested_nob) {
1436 CERROR("Unexpected rc %d (%d requested)\n", rc,
1437 aa->aa_requested_nob);
1441 if (rc != req->rq_bulk->bd_nob_transferred) {
1442 CERROR ("Unexpected rc %d (%d transferred)\n",
1443 rc, req->rq_bulk->bd_nob_transferred);
1447 if (rc < aa->aa_requested_nob)
1448 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga, aa->aa_pshift);
1450 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1451 static int cksum_counter;
1452 __u32 server_cksum = body->oa.o_cksum;
1455 cksum_type_t cksum_type;
1457 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1458 cksum_type = cksum_type_unpack(body->oa.o_flags);
1460 cksum_type = OBD_CKSUM_CRC32;
1461 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1462 aa->aa_ppga, OST_READ,
1463 cksum_type, aa->aa_pshift);
1465 if (peer->nid == req->rq_bulk->bd_sender) {
1469 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1472 if (server_cksum == ~0 && rc > 0) {
1473 CERROR("Protocol error: server %s set the 'checksum' "
1474 "bit, but didn't send a checksum. Not fatal, "
1475 "but please notify on http://bugzilla.lustre.org/\n",
1476 libcfs_nid2str(peer->nid));
1477 } else if (server_cksum != client_cksum) {
1478 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1479 "%s%s%s inum "LPU64"/"LPU64" object "
1480 LPU64"/"LPU64" extent "
1481 "["LPU64"-"LPU64"]\n",
1482 req->rq_import->imp_obd->obd_name,
1483 libcfs_nid2str(peer->nid),
1485 body->oa.o_valid & OBD_MD_FLFID ?
1486 body->oa.o_fid : (__u64)0,
1487 body->oa.o_valid & OBD_MD_FLFID ?
1488 body->oa.o_generation :(__u64)0,
1490 body->oa.o_valid & OBD_MD_FLGROUP ?
1491 body->oa.o_gr : (__u64)0,
1492 aa->aa_ppga[0]->off,
1493 aa->aa_ppga[aa->aa_page_count-1]->off +
1494 aa->aa_ppga[aa->aa_page_count-1]->count -
1496 CERROR("client %x, server %x, cksum_type %x\n",
1497 client_cksum, server_cksum, cksum_type);
1499 aa->aa_oa->o_cksum = client_cksum;
1503 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1506 } else if (unlikely(client_cksum)) {
1507 static int cksum_missed;
1510 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1511 CERROR("Checksum %u requested from %s but not sent\n",
1512 cksum_missed, libcfs_nid2str(peer->nid));
1518 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1523 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1524 struct lov_stripe_md *lsm,
1525 obd_count page_count, struct brw_page **pga)
1527 struct ptlrpc_request *request;
1531 struct l_wait_info lwi;
1534 init_waitqueue_head(&waitq);
1537 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1538 page_count, pga, &request, 0);
1542 rc = ptlrpc_queue_wait(request);
1544 if (rc == -ETIMEDOUT && request->rq_resend) {
1545 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1546 ptlrpc_req_finished(request);
1550 rc = osc_brw_fini_request(request, rc);
1552 ptlrpc_req_finished(request);
1553 if (osc_recoverable_error(rc)) {
1555 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1556 CERROR("too many resend retries, returning error\n");
1560 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1561 l_wait_event(waitq, 0, &lwi);
1568 int osc_brw_redo_request(struct ptlrpc_request *request,
1569 struct osc_brw_async_args *aa)
1571 struct ptlrpc_request *new_req;
1572 struct ptlrpc_request_set *set = request->rq_set;
1573 struct osc_brw_async_args *new_aa;
1574 struct osc_async_page *oap;
1578 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1579 CERROR("too many resend retries, returning error\n");
1583 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1585 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1586 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1587 aa->aa_cli, aa->aa_oa,
1588 NULL /* lsm unused by osc currently */,
1589 aa->aa_page_count, aa->aa_ppga, &new_req,
1594 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1596 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1597 if (oap->oap_request != NULL) {
1598 LASSERTF(request == oap->oap_request,
1599 "request %p != oap_request %p\n",
1600 request, oap->oap_request);
1601 if (oap->oap_interrupted) {
1602 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1603 ptlrpc_req_finished(new_req);
1608 /* New request takes over pga and oaps from old request.
1609 * Note that copying a list_head doesn't work, need to move it... */
1611 new_req->rq_interpret_reply = request->rq_interpret_reply;
1612 new_req->rq_async_args = request->rq_async_args;
1613 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1615 new_aa = ptlrpc_req_async_args(new_req);
1617 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1618 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1619 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1621 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1622 if (oap->oap_request) {
1623 ptlrpc_req_finished(oap->oap_request);
1624 oap->oap_request = ptlrpc_request_addref(new_req);
1628 /* use ptlrpc_set_add_req is safe because interpret functions work
1629 * in check_set context. only one way exist with access to request
1630 * from different thread got -EINTR - this way protected with
1631 * cl_loi_list_lock */
1632 ptlrpc_set_add_req(set, new_req);
1634 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1636 DEBUG_REQ(D_INFO, new_req, "new request");
1640 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1641 struct lov_stripe_md *lsm, obd_count page_count,
1642 struct brw_page **pga, struct ptlrpc_request_set *set,
1645 struct ptlrpc_request *request;
1646 struct client_obd *cli = &exp->exp_obd->u.cli;
1648 struct osc_brw_async_args *aa;
1651 /* Consume write credits even if doing a sync write -
1652 * otherwise we may run out of space on OST due to grant. */
1653 /* FIXME: unaligned writes must use write grants too */
1654 if (cmd == OBD_BRW_WRITE && pshift == 0) {
1655 client_obd_list_lock(&cli->cl_loi_list_lock);
1656 for (i = 0; i < page_count; i++) {
1657 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1658 osc_consume_write_grant(cli, pga[i]);
1660 client_obd_list_unlock(&cli->cl_loi_list_lock);
1663 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1664 page_count, pga, &request, pshift);
1666 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1669 aa = ptlrpc_req_async_args(request);
1670 if (cmd == OBD_BRW_READ) {
1671 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1672 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1674 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1675 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1676 cli->cl_w_in_flight);
1678 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1680 LASSERT(list_empty(&aa->aa_oaps));
1682 request->rq_interpret_reply = brw_interpret;
1683 ptlrpc_set_add_req(set, request);
1684 client_obd_list_lock(&cli->cl_loi_list_lock);
1685 if (cmd == OBD_BRW_READ)
1686 cli->cl_r_in_flight++;
1688 cli->cl_w_in_flight++;
1689 client_obd_list_unlock(&cli->cl_loi_list_lock);
1690 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1691 } else if (cmd == OBD_BRW_WRITE) {
1692 client_obd_list_lock(&cli->cl_loi_list_lock);
1693 for (i = 0; i < page_count; i++)
1694 osc_release_write_grant(cli, pga[i], 0);
1695 osc_wake_cache_waiters(cli);
1696 client_obd_list_unlock(&cli->cl_loi_list_lock);
1703 * ugh, we want disk allocation on the target to happen in offset order. we'll
1704 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1705 * fine for our small page arrays and doesn't require allocation. its an
1706 * insertion sort that swaps elements that are strides apart, shrinking the
1707 * stride down until its '1' and the array is sorted.
1709 static void sort_brw_pages(struct brw_page **array, int num)
1712 struct brw_page *tmp;
1716 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1721 for (i = stride ; i < num ; i++) {
1724 while (j >= stride && array[j-stride]->off > tmp->off) {
1725 array[j] = array[j - stride];
1730 } while (stride > 1);
1733 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1740 LASSERT (pages > 0);
1741 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1745 if (pages == 0) /* that's all */
1748 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1749 return count; /* doesn't end on page boundary */
1752 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1753 if (offset != 0) /* doesn't start on page boundary */
1760 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1762 struct brw_page **ppga;
1765 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1769 for (i = 0; i < count; i++)
1774 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1776 LASSERT(ppga != NULL);
1777 OBD_FREE(ppga, sizeof(*ppga) * count);
1780 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1781 obd_count page_count, struct brw_page *pga,
1782 struct obd_trans_info *oti)
1784 struct obdo *saved_oa = NULL;
1785 struct brw_page **ppga, **orig;
1786 struct obd_import *imp = class_exp2cliimp(exp);
1787 struct client_obd *cli;
1788 int rc, page_count_orig;
1791 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1792 cli = &imp->imp_obd->u.cli;
1794 if (cmd & OBD_BRW_CHECK) {
1795 /* The caller just wants to know if there's a chance that this
1796 * I/O can succeed */
1798 if (imp->imp_invalid)
1803 /* test_brw with a failed create can trip this, maybe others. */
1804 LASSERT(cli->cl_max_pages_per_rpc);
1808 orig = ppga = osc_build_ppga(pga, page_count);
1811 page_count_orig = page_count;
1813 sort_brw_pages(ppga, page_count);
1814 while (page_count) {
1815 obd_count pages_per_brw;
1817 if (page_count > cli->cl_max_pages_per_rpc)
1818 pages_per_brw = cli->cl_max_pages_per_rpc;
1820 pages_per_brw = page_count;
1822 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1824 if (saved_oa != NULL) {
1825 /* restore previously saved oa */
1826 *oinfo->oi_oa = *saved_oa;
1827 } else if (page_count > pages_per_brw) {
1828 /* save a copy of oa (brw will clobber it) */
1829 OBDO_ALLOC(saved_oa);
1830 if (saved_oa == NULL)
1831 GOTO(out, rc = -ENOMEM);
1832 *saved_oa = *oinfo->oi_oa;
1835 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1836 pages_per_brw, ppga);
1841 page_count -= pages_per_brw;
1842 ppga += pages_per_brw;
1846 osc_release_ppga(orig, page_count_orig);
1848 if (saved_oa != NULL)
1849 OBDO_FREE(saved_oa);
1854 static int osc_brw_async(int cmd, struct obd_export *exp,
1855 struct obd_info *oinfo, obd_count page_count,
1856 struct brw_page *pga, struct obd_trans_info *oti,
1857 struct ptlrpc_request_set *set, int pshift)
1859 struct brw_page **ppga, **orig;
1860 int page_count_orig;
1864 if (cmd & OBD_BRW_CHECK) {
1865 /* The caller just wants to know if there's a chance that this
1866 * I/O can succeed */
1867 struct obd_import *imp = class_exp2cliimp(exp);
1869 if (imp == NULL || imp->imp_invalid)
1874 orig = ppga = osc_build_ppga(pga, page_count);
1877 page_count_orig = page_count;
1879 sort_brw_pages(ppga, page_count);
1880 while (page_count) {
1881 struct brw_page **copy;
1883 obd_count pages_per_brw;
1885 /* one page less under unaligned direct i/o */
1886 pages_per_brw = min_t(obd_count, page_count,
1887 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1890 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1893 /* use ppga only if single RPC is going to fly */
1894 if (pages_per_brw != page_count_orig || ppga != orig) {
1895 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1897 GOTO(out, rc = -ENOMEM);
1898 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1902 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1903 GOTO(out, rc = -ENOMEM);
1905 memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1906 oa->o_flags |= OBD_FL_TEMPORARY;
1910 LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1913 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1918 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1920 if (oa->o_flags & OBD_FL_TEMPORARY)
1926 /* we passed it to async_internal() which is
1927 * now responsible for releasing memory */
1931 page_count -= pages_per_brw;
1932 ppga += pages_per_brw;
1936 osc_release_ppga(orig, page_count_orig);
1940 static void osc_check_rpcs(struct client_obd *cli);
1942 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1943 * the dirty accounting. Writeback completes or truncate happens before
1944 * writing starts. Must be called with the loi lock held. */
1945 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1948 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1951 /* This maintains the lists of pending pages to read/write for a given object
1952 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1953 * to quickly find objects that are ready to send an RPC. */
1954 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1960 if (lop->lop_num_pending == 0)
1963 /* if we have an invalid import we want to drain the queued pages
1964 * by forcing them through rpcs that immediately fail and complete
1965 * the pages. recovery relies on this to empty the queued pages
1966 * before canceling the locks and evicting down the llite pages */
1967 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1970 /* stream rpcs in queue order as long as as there is an urgent page
1971 * queued. this is our cheap solution for good batching in the case
1972 * where writepage marks some random page in the middle of the file
1973 * as urgent because of, say, memory pressure */
1974 if (!list_empty(&lop->lop_urgent)) {
1975 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1979 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1980 optimal = cli->cl_max_pages_per_rpc;
1981 if (cmd & OBD_BRW_WRITE) {
1982 /* trigger a write rpc stream as long as there are dirtiers
1983 * waiting for space. as they're waiting, they're not going to
1984 * create more pages to coallesce with what's waiting.. */
1985 if (!list_empty(&cli->cl_cache_waiters)) {
1986 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1990 /* +16 to avoid triggering rpcs that would want to include pages
1991 * that are being queued but which can't be made ready until
1992 * the queuer finishes with the page. this is a wart for
1993 * llite::commit_write() */
1996 if (lop->lop_num_pending >= optimal)
2002 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2004 struct osc_async_page *oap;
2007 if (list_empty(&lop->lop_urgent))
2010 oap = list_entry(lop->lop_urgent.next,
2011 struct osc_async_page, oap_urgent_item);
2013 if (oap->oap_async_flags & ASYNC_HP) {
2014 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2021 static void on_list(struct list_head *item, struct list_head *list,
2024 if (list_empty(item) && should_be_on)
2025 list_add_tail(item, list);
2026 else if (!list_empty(item) && !should_be_on)
2027 list_del_init(item);
2030 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2031 * can find pages to build into rpcs quickly */
2032 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2034 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2035 lop_makes_hprpc(&loi->loi_read_lop)) {
2037 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2038 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2040 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2041 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2042 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2043 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2046 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2047 loi->loi_write_lop.lop_num_pending);
2049 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2050 loi->loi_read_lop.lop_num_pending);
2053 static void lop_update_pending(struct client_obd *cli,
2054 struct loi_oap_pages *lop, int cmd, int delta)
2056 lop->lop_num_pending += delta;
2057 if (cmd & OBD_BRW_WRITE)
2058 cli->cl_pending_w_pages += delta;
2060 cli->cl_pending_r_pages += delta;
2063 /* this is called when a sync waiter receives an interruption. Its job is to
2064 * get the caller woken as soon as possible. If its page hasn't been put in an
2065 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2066 * desiring interruption which will forcefully complete the rpc once the rpc
2068 static void osc_occ_interrupted(struct oig_callback_context *occ)
2070 struct osc_async_page *oap;
2071 struct loi_oap_pages *lop;
2072 struct lov_oinfo *loi;
2075 /* XXX member_of() */
2076 oap = list_entry(occ, struct osc_async_page, oap_occ);
2078 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2080 oap->oap_interrupted = 1;
2082 /* ok, it's been put in an rpc. only one oap gets a request reference */
2083 if (oap->oap_request != NULL) {
2084 ptlrpc_mark_interrupted(oap->oap_request);
2085 ptlrpcd_wake(oap->oap_request);
2089 /* we don't get interruption callbacks until osc_trigger_group_io()
2090 * has been called and put the sync oaps in the pending/urgent lists.*/
2091 if (!list_empty(&oap->oap_pending_item)) {
2092 list_del_init(&oap->oap_pending_item);
2093 list_del_init(&oap->oap_urgent_item);
2096 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2097 &loi->loi_write_lop : &loi->loi_read_lop;
2098 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2099 loi_list_maint(oap->oap_cli, oap->oap_loi);
2101 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2102 oap->oap_oig = NULL;
2106 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2109 /* this is trying to propogate async writeback errors back up to the
2110 * application. As an async write fails we record the error code for later if
2111 * the app does an fsync. As long as errors persist we force future rpcs to be
2112 * sync so that the app can get a sync error and break the cycle of queueing
2113 * pages for which writeback will fail. */
2114 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2121 ar->ar_force_sync = 1;
2122 ar->ar_min_xid = ptlrpc_sample_next_xid();
2127 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2128 ar->ar_force_sync = 0;
2131 static void osc_oap_to_pending(struct osc_async_page *oap)
2133 struct loi_oap_pages *lop;
2135 if (oap->oap_cmd & OBD_BRW_WRITE)
2136 lop = &oap->oap_loi->loi_write_lop;
2138 lop = &oap->oap_loi->loi_read_lop;
2140 if (oap->oap_async_flags & ASYNC_HP)
2141 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2142 else if (oap->oap_async_flags & ASYNC_URGENT)
2143 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2144 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2145 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2148 /* this must be called holding the loi list lock to give coverage to exit_cache,
2149 * async_flag maintenance, and oap_request */
2150 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2151 struct osc_async_page *oap, int sent, int rc)
2156 if (oap->oap_request != NULL) {
2157 xid = ptlrpc_req_xid(oap->oap_request);
2158 ptlrpc_req_finished(oap->oap_request);
2159 oap->oap_request = NULL;
2162 spin_lock(&oap->oap_lock);
2163 oap->oap_async_flags = 0;
2164 spin_unlock(&oap->oap_lock);
2165 oap->oap_interrupted = 0;
2167 if (oap->oap_cmd & OBD_BRW_WRITE) {
2168 osc_process_ar(&cli->cl_ar, xid, rc);
2169 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2172 if (rc == 0 && oa != NULL) {
2173 if (oa->o_valid & OBD_MD_FLBLOCKS)
2174 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2175 if (oa->o_valid & OBD_MD_FLMTIME)
2176 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2177 if (oa->o_valid & OBD_MD_FLATIME)
2178 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2179 if (oa->o_valid & OBD_MD_FLCTIME)
2180 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2184 osc_exit_cache(cli, oap, sent);
2185 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2186 oap->oap_oig = NULL;
2191 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2192 oap->oap_cmd, oa, rc);
2194 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2195 * I/O on the page could start, but OSC calls it under lock
2196 * and thus we can add oap back to pending safely */
2198 /* upper layer wants to leave the page on pending queue */
2199 osc_oap_to_pending(oap);
2201 osc_exit_cache(cli, oap, sent);
2205 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2207 struct osc_brw_async_args *aa = data;
2208 struct client_obd *cli;
2211 rc = osc_brw_fini_request(request, rc);
2212 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2214 if (osc_recoverable_error(rc)) {
2215 rc = osc_brw_redo_request(request, aa);
2221 client_obd_list_lock(&cli->cl_loi_list_lock);
2222 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2223 * is called so we know whether to go to sync BRWs or wait for more
2224 * RPCs to complete */
2225 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2226 cli->cl_w_in_flight--;
2228 cli->cl_r_in_flight--;
2230 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2231 struct osc_async_page *oap, *tmp;
2232 /* the caller may re-use the oap after the completion call so
2233 * we need to clean it up a little */
2234 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2235 list_del_init(&oap->oap_rpc_item);
2236 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2238 OBDO_FREE(aa->aa_oa);
2239 } else { /* from async_internal() */
2241 for (i = 0; i < aa->aa_page_count; i++)
2242 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2244 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2245 OBDO_FREE(aa->aa_oa);
2247 osc_wake_cache_waiters(cli);
2248 osc_check_rpcs(cli);
2249 client_obd_list_unlock(&cli->cl_loi_list_lock);
2251 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2256 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2257 struct list_head *rpc_list,
2258 int page_count, int cmd)
2260 struct ptlrpc_request *req;
2261 struct brw_page **pga = NULL;
2262 struct osc_brw_async_args *aa;
2263 struct obdo *oa = NULL;
2264 struct obd_async_page_ops *ops = NULL;
2265 void *caller_data = NULL;
2266 struct osc_async_page *oap;
2267 struct ldlm_lock *lock = NULL;
2272 LASSERT(!list_empty(rpc_list));
2274 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2276 RETURN(ERR_PTR(-ENOMEM));
2280 GOTO(out, req = ERR_PTR(-ENOMEM));
2283 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2285 ops = oap->oap_caller_ops;
2286 caller_data = oap->oap_caller_data;
2287 lock = oap->oap_ldlm_lock;
2289 pga[i] = &oap->oap_brw_page;
2290 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2291 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2292 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2296 /* always get the data for the obdo for the rpc */
2297 LASSERT(ops != NULL);
2298 ops->ap_fill_obdo(caller_data, cmd, oa);
2300 oa->o_handle = lock->l_remote_handle;
2301 oa->o_valid |= OBD_MD_FLHANDLE;
2304 sort_brw_pages(pga, page_count);
2305 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0);
2307 CERROR("prep_req failed: %d\n", rc);
2308 GOTO(out, req = ERR_PTR(rc));
2310 oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2311 sizeof(struct ost_body)))->oa;
2313 /* Need to update the timestamps after the request is built in case
2314 * we race with setattr (locally or in queue at OST). If OST gets
2315 * later setattr before earlier BRW (as determined by the request xid),
2316 * the OST will not use BRW timestamps. Sadly, there is no obvious
2317 * way to do this in a single call. bug 10150 */
2318 if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2319 /* in case of lockless read/write do not use inode's
2320 * timestamps because concurrent stat might fill the
2321 * inode with out-of-date times, send current
2323 if (cmd & OBD_BRW_WRITE) {
2324 oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2325 oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2326 valid = OBD_MD_FLATIME;
2328 oa->o_atime = LTIME_S(CURRENT_TIME);
2329 oa->o_valid |= OBD_MD_FLATIME;
2330 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2333 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2335 ops->ap_update_obdo(caller_data, cmd, oa, valid);
2337 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2338 aa = ptlrpc_req_async_args(req);
2339 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2340 list_splice(rpc_list, &aa->aa_oaps);
2341 CFS_INIT_LIST_HEAD(rpc_list);
2348 OBD_FREE(pga, sizeof(*pga) * page_count);
2353 /* the loi lock is held across this function but it's allowed to release
2354 * and reacquire it during its work */
2356 * prepare pages for ASYNC io and put pages in send queue.
2360 * \param cmd - OBD_BRW_* macroses
2361 * \param lop - pending pages
2363 * \return zero if pages successfully add to send queue.
2364 * \return not zere if error occurring.
2366 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2367 int cmd, struct loi_oap_pages *lop)
2369 struct ptlrpc_request *req;
2370 obd_count page_count = 0;
2371 struct osc_async_page *oap = NULL, *tmp;
2372 struct osc_brw_async_args *aa;
2373 struct obd_async_page_ops *ops;
2374 CFS_LIST_HEAD(rpc_list);
2375 unsigned int ending_offset;
2376 unsigned starting_offset = 0;
2380 /* If there are HP OAPs we need to handle at least 1 of them,
2381 * move it the beginning of the pending list for that. */
2382 if (!list_empty(&lop->lop_urgent)) {
2383 oap = list_entry(lop->lop_urgent.next,
2384 struct osc_async_page, oap_urgent_item);
2385 if (oap->oap_async_flags & ASYNC_HP)
2386 list_move(&oap->oap_pending_item, &lop->lop_pending);
2389 /* first we find the pages we're allowed to work with */
2390 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2391 ops = oap->oap_caller_ops;
2393 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2394 "magic 0x%x\n", oap, oap->oap_magic);
2396 if (page_count != 0 &&
2397 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2398 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2399 " oap %p, page %p, srvlock %u\n",
2400 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2403 /* in llite being 'ready' equates to the page being locked
2404 * until completion unlocks it. commit_write submits a page
2405 * as not ready because its unlock will happen unconditionally
2406 * as the call returns. if we race with commit_write giving
2407 * us that page we dont' want to create a hole in the page
2408 * stream, so we stop and leave the rpc to be fired by
2409 * another dirtier or kupdated interval (the not ready page
2410 * will still be on the dirty list). we could call in
2411 * at the end of ll_file_write to process the queue again. */
2412 if (!(oap->oap_async_flags & ASYNC_READY)) {
2413 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2415 CDEBUG(D_INODE, "oap %p page %p returned %d "
2416 "instead of ready\n", oap,
2420 /* llite is telling us that the page is still
2421 * in commit_write and that we should try
2422 * and put it in an rpc again later. we
2423 * break out of the loop so we don't create
2424 * a hole in the sequence of pages in the rpc
2429 /* the io isn't needed.. tell the checks
2430 * below to complete the rpc with EINTR */
2431 spin_lock(&oap->oap_lock);
2432 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2433 spin_unlock(&oap->oap_lock);
2434 oap->oap_count = -EINTR;
2437 spin_lock(&oap->oap_lock);
2438 oap->oap_async_flags |= ASYNC_READY;
2439 spin_unlock(&oap->oap_lock);
2442 LASSERTF(0, "oap %p page %p returned %d "
2443 "from make_ready\n", oap,
2451 * Page submitted for IO has to be locked. Either by
2452 * ->ap_make_ready() or by higher layers.
2454 #if defined(__KERNEL__) && defined(__linux__)
2455 if(!(PageLocked(oap->oap_page) &&
2456 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2457 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2458 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2462 /* If there is a gap at the start of this page, it can't merge
2463 * with any previous page, so we'll hand the network a
2464 * "fragmented" page array that it can't transfer in 1 RDMA */
2465 if (page_count != 0 && oap->oap_page_off != 0)
2468 /* take the page out of our book-keeping */
2469 list_del_init(&oap->oap_pending_item);
2470 lop_update_pending(cli, lop, cmd, -1);
2471 list_del_init(&oap->oap_urgent_item);
2473 if (page_count == 0)
2474 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2475 (PTLRPC_MAX_BRW_SIZE - 1);
2477 /* ask the caller for the size of the io as the rpc leaves. */
2478 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2480 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2481 if (oap->oap_count <= 0) {
2482 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2484 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2488 /* now put the page back in our accounting */
2489 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2490 if (page_count == 0)
2491 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2492 if (++page_count >= cli->cl_max_pages_per_rpc)
2495 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2496 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2497 * have the same alignment as the initial writes that allocated
2498 * extents on the server. */
2499 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2500 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2501 if (ending_offset == 0)
2504 /* If there is a gap at the end of this page, it can't merge
2505 * with any subsequent pages, so we'll hand the network a
2506 * "fragmented" page array that it can't transfer in 1 RDMA */
2507 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2511 osc_wake_cache_waiters(cli);
2513 if (page_count == 0)
2516 loi_list_maint(cli, loi);
2518 client_obd_list_unlock(&cli->cl_loi_list_lock);
2520 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2522 /* this should happen rarely and is pretty bad, it makes the
2523 * pending list not follow the dirty order */
2524 client_obd_list_lock(&cli->cl_loi_list_lock);
2525 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2526 list_del_init(&oap->oap_rpc_item);
2528 /* queued sync pages can be torn down while the pages
2529 * were between the pending list and the rpc */
2530 if (oap->oap_interrupted) {
2531 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2532 osc_ap_completion(cli, NULL, oap, 0,
2536 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2538 loi_list_maint(cli, loi);
2539 RETURN(PTR_ERR(req));
2542 aa = ptlrpc_req_async_args(req);
2543 if (cmd == OBD_BRW_READ) {
2544 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2545 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2546 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2547 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2549 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2550 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2551 cli->cl_w_in_flight);
2552 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2553 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2555 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2557 client_obd_list_lock(&cli->cl_loi_list_lock);
2559 if (cmd == OBD_BRW_READ)
2560 cli->cl_r_in_flight++;
2562 cli->cl_w_in_flight++;
2564 /* queued sync pages can be torn down while the pages
2565 * were between the pending list and the rpc */
2567 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2568 /* only one oap gets a request reference */
2571 if (oap->oap_interrupted && !req->rq_intr) {
2572 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2574 ptlrpc_mark_interrupted(req);
2578 tmp->oap_request = ptlrpc_request_addref(req);
2580 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2581 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2583 req->rq_interpret_reply = brw_interpret;
2584 ptlrpcd_add_req(req);
2588 #define LOI_DEBUG(LOI, STR, args...) \
2589 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2590 !list_empty(&(LOI)->loi_ready_item) || \
2591 !list_empty(&(LOI)->loi_hp_ready_item), \
2592 (LOI)->loi_write_lop.lop_num_pending, \
2593 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2594 (LOI)->loi_read_lop.lop_num_pending, \
2595 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2598 /* This is called by osc_check_rpcs() to find which objects have pages that
2599 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2600 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2603 /* First return objects that have blocked locks so that they
2604 * will be flushed quickly and other clients can get the lock,
2605 * then objects which have pages ready to be stuffed into RPCs */
2606 if (!list_empty(&cli->cl_loi_hp_ready_list))
2607 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2608 struct lov_oinfo, loi_hp_ready_item));
2609 if (!list_empty(&cli->cl_loi_ready_list))
2610 RETURN(list_entry(cli->cl_loi_ready_list.next,
2611 struct lov_oinfo, loi_ready_item));
2613 /* then if we have cache waiters, return all objects with queued
2614 * writes. This is especially important when many small files
2615 * have filled up the cache and not been fired into rpcs because
2616 * they don't pass the nr_pending/object threshhold */
2617 if (!list_empty(&cli->cl_cache_waiters) &&
2618 !list_empty(&cli->cl_loi_write_list))
2619 RETURN(list_entry(cli->cl_loi_write_list.next,
2620 struct lov_oinfo, loi_write_item));
2622 /* then return all queued objects when we have an invalid import
2623 * so that they get flushed */
2624 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2625 if (!list_empty(&cli->cl_loi_write_list))
2626 RETURN(list_entry(cli->cl_loi_write_list.next,
2627 struct lov_oinfo, loi_write_item));
2628 if (!list_empty(&cli->cl_loi_read_list))
2629 RETURN(list_entry(cli->cl_loi_read_list.next,
2630 struct lov_oinfo, loi_read_item));
2635 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2637 struct osc_async_page *oap;
2640 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2641 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2642 struct osc_async_page, oap_urgent_item);
2643 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2646 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2647 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2648 struct osc_async_page, oap_urgent_item);
2649 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2652 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2655 /* called with the loi list lock held */
2656 static void osc_check_rpcs(struct client_obd *cli)
2658 struct lov_oinfo *loi;
2659 int rc = 0, race_counter = 0;
2662 while ((loi = osc_next_loi(cli)) != NULL) {
2663 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2665 if (osc_max_rpc_in_flight(cli, loi))
2668 /* attempt some read/write balancing by alternating between
2669 * reads and writes in an object. The makes_rpc checks here
2670 * would be redundant if we were getting read/write work items
2671 * instead of objects. we don't want send_oap_rpc to drain a
2672 * partial read pending queue when we're given this object to
2673 * do io on writes while there are cache waiters */
2674 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2675 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2676 &loi->loi_write_lop);
2684 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2685 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2686 &loi->loi_read_lop);
2695 /* attempt some inter-object balancing by issueing rpcs
2696 * for each object in turn */
2697 if (!list_empty(&loi->loi_hp_ready_item))
2698 list_del_init(&loi->loi_hp_ready_item);
2699 if (!list_empty(&loi->loi_ready_item))
2700 list_del_init(&loi->loi_ready_item);
2701 if (!list_empty(&loi->loi_write_item))
2702 list_del_init(&loi->loi_write_item);
2703 if (!list_empty(&loi->loi_read_item))
2704 list_del_init(&loi->loi_read_item);
2706 loi_list_maint(cli, loi);
2708 /* send_oap_rpc fails with 0 when make_ready tells it to
2709 * back off. llite's make_ready does this when it tries
2710 * to lock a page queued for write that is already locked.
2711 * we want to try sending rpcs from many objects, but we
2712 * don't want to spin failing with 0. */
2713 if (race_counter == 10)
2719 /* we're trying to queue a page in the osc so we're subject to the
2720 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2721 * If the osc's queued pages are already at that limit, then we want to sleep
2722 * until there is space in the osc's queue for us. We also may be waiting for
2723 * write credits from the OST if there are RPCs in flight that may return some
2724 * before we fall back to sync writes.
2726 * We need this know our allocation was granted in the presence of signals */
2727 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2731 client_obd_list_lock(&cli->cl_loi_list_lock);
2732 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2733 client_obd_list_unlock(&cli->cl_loi_list_lock);
2737 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2738 * grant or cache space. */
2739 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2740 struct osc_async_page *oap)
2742 struct osc_cache_waiter ocw;
2743 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2746 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2747 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2748 cli->cl_dirty_max, obd_max_dirty_pages,
2749 cli->cl_lost_grant, cli->cl_avail_grant);
2751 /* force the caller to try sync io. this can jump the list
2752 * of queued writes and create a discontiguous rpc stream */
2753 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2754 loi->loi_ar.ar_force_sync)
2757 /* Hopefully normal case - cache space and write credits available */
2758 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2759 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2760 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2761 /* account for ourselves */
2762 osc_consume_write_grant(cli, &oap->oap_brw_page);
2766 /* It is safe to block as a cache waiter as long as there is grant
2767 * space available or the hope of additional grant being returned
2768 * when an in flight write completes. Using the write back cache
2769 * if possible is preferable to sending the data synchronously
2770 * because write pages can then be merged in to large requests.
2771 * The addition of this cache waiter will causing pending write
2772 * pages to be sent immediately. */
2773 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2774 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2775 cfs_waitq_init(&ocw.ocw_waitq);
2779 loi_list_maint(cli, loi);
2780 osc_check_rpcs(cli);
2781 client_obd_list_unlock(&cli->cl_loi_list_lock);
2783 CDEBUG(D_CACHE, "sleeping for cache space\n");
2784 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2786 client_obd_list_lock(&cli->cl_loi_list_lock);
2787 if (!list_empty(&ocw.ocw_entry)) {
2788 list_del(&ocw.ocw_entry);
2797 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2798 void **res, int rw, obd_off start, obd_off end,
2799 struct lustre_handle *lockh, int flags)
2801 struct ldlm_lock *lock = NULL;
2802 int rc, release = 0;
2806 if (lockh && lustre_handle_is_used(lockh)) {
2807 /* if a valid lockh is passed, just check that the corresponding
2808 * lock covers the extent */
2809 lock = ldlm_handle2lock(lockh);
2812 struct osc_async_page *oap = *res;
2813 spin_lock(&oap->oap_lock);
2814 lock = oap->oap_ldlm_lock;
2816 LDLM_LOCK_GET(lock);
2817 spin_unlock(&oap->oap_lock);
2819 /* lock can be NULL in case race obd_get_lock vs lock cancel
2820 * so we should be don't try match this */
2821 if (unlikely(!lock))
2824 rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2825 if (release == 1 && rc == 1)
2826 /* if a valid lockh was passed, we just need to check
2827 * that the lock covers the page, no reference should be
2829 ldlm_lock_decref(lockh,
2830 rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2831 LDLM_LOCK_PUT(lock);
2835 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2836 struct lov_oinfo *loi, cfs_page_t *page,
2837 obd_off offset, struct obd_async_page_ops *ops,
2838 void *data, void **res, int flags,
2839 struct lustre_handle *lockh)
2841 struct osc_async_page *oap;
2842 struct ldlm_res_id oid = {{0}};
2848 return size_round(sizeof(*oap));
2851 oap->oap_magic = OAP_MAGIC;
2852 oap->oap_cli = &exp->exp_obd->u.cli;
2855 oap->oap_caller_ops = ops;
2856 oap->oap_caller_data = data;
2858 oap->oap_page = page;
2859 oap->oap_obj_off = offset;
2861 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2862 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2863 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2864 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2866 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2868 spin_lock_init(&oap->oap_lock);
2870 /* If the page was marked as notcacheable - don't add to any locks */
2871 if (!(flags & OBD_PAGE_NO_CACHE)) {
2872 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2873 /* This is the only place where we can call cache_add_extent
2874 without oap_lock, because this page is locked now, and
2875 the lock we are adding it to is referenced, so cannot lose
2876 any pages either. */
2877 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2882 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2886 struct osc_async_page *oap_from_cookie(void *cookie)
2888 struct osc_async_page *oap = cookie;
2889 if (oap->oap_magic != OAP_MAGIC)
2890 return ERR_PTR(-EINVAL);
2894 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2895 struct lov_oinfo *loi, void *cookie,
2896 int cmd, obd_off off, int count,
2897 obd_flag brw_flags, enum async_flags async_flags)
2899 struct client_obd *cli = &exp->exp_obd->u.cli;
2900 struct osc_async_page *oap;
2904 oap = oap_from_cookie(cookie);
2906 RETURN(PTR_ERR(oap));
2908 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2911 if (!list_empty(&oap->oap_pending_item) ||
2912 !list_empty(&oap->oap_urgent_item) ||
2913 !list_empty(&oap->oap_rpc_item))
2916 /* check if the file's owner/group is over quota */
2917 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2918 struct obd_async_page_ops *ops;
2925 ops = oap->oap_caller_ops;
2926 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2927 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2937 loi = lsm->lsm_oinfo[0];
2939 client_obd_list_lock(&cli->cl_loi_list_lock);
2942 oap->oap_page_off = off;
2943 oap->oap_count = count;
2944 oap->oap_brw_flags = brw_flags;
2945 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2946 if (libcfs_memory_pressure_get())
2947 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2948 spin_lock(&oap->oap_lock);
2949 oap->oap_async_flags = async_flags;
2950 spin_unlock(&oap->oap_lock);
2952 if (cmd & OBD_BRW_WRITE) {
2953 rc = osc_enter_cache(cli, loi, oap);
2955 client_obd_list_unlock(&cli->cl_loi_list_lock);
2960 osc_oap_to_pending(oap);
2961 loi_list_maint(cli, loi);
2963 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2966 osc_check_rpcs(cli);
2967 client_obd_list_unlock(&cli->cl_loi_list_lock);
2972 /* aka (~was & now & flag), but this is more clear :) */
2973 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2975 static int osc_set_async_flags(struct obd_export *exp,
2976 struct lov_stripe_md *lsm,
2977 struct lov_oinfo *loi, void *cookie,
2978 obd_flag async_flags)
2980 struct client_obd *cli = &exp->exp_obd->u.cli;
2981 struct loi_oap_pages *lop;
2982 struct osc_async_page *oap;
2986 oap = oap_from_cookie(cookie);
2988 RETURN(PTR_ERR(oap));
2991 * bug 7311: OST-side locking is only supported for liblustre for now
2992 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2993 * implementation has to handle case where OST-locked page was picked
2994 * up by, e.g., ->writepage().
2996 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2997 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
3000 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3004 loi = lsm->lsm_oinfo[0];
3006 if (oap->oap_cmd & OBD_BRW_WRITE) {
3007 lop = &loi->loi_write_lop;
3009 lop = &loi->loi_read_lop;
3012 client_obd_list_lock(&cli->cl_loi_list_lock);
3013 /* oap_lock provides atomic semantics of oap_async_flags access */
3014 spin_lock(&oap->oap_lock);
3015 if (list_empty(&oap->oap_pending_item))
3016 GOTO(out, rc = -EINVAL);
3018 if ((oap->oap_async_flags & async_flags) == async_flags)
3021 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3022 oap->oap_async_flags |= ASYNC_READY;
3024 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3025 list_empty(&oap->oap_rpc_item)) {
3026 if (oap->oap_async_flags & ASYNC_HP)
3027 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3029 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3030 oap->oap_async_flags |= ASYNC_URGENT;
3031 loi_list_maint(cli, loi);
3034 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3035 oap->oap_async_flags);
3037 spin_unlock(&oap->oap_lock);
3038 osc_check_rpcs(cli);
3039 client_obd_list_unlock(&cli->cl_loi_list_lock);
3043 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3044 struct lov_oinfo *loi,
3045 struct obd_io_group *oig, void *cookie,
3046 int cmd, obd_off off, int count,
3048 obd_flag async_flags)
3050 struct client_obd *cli = &exp->exp_obd->u.cli;
3051 struct osc_async_page *oap;
3052 struct loi_oap_pages *lop;
3056 oap = oap_from_cookie(cookie);
3058 RETURN(PTR_ERR(oap));
3060 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3063 if (!list_empty(&oap->oap_pending_item) ||
3064 !list_empty(&oap->oap_urgent_item) ||
3065 !list_empty(&oap->oap_rpc_item))
3069 loi = lsm->lsm_oinfo[0];
3071 client_obd_list_lock(&cli->cl_loi_list_lock);
3074 oap->oap_page_off = off;
3075 oap->oap_count = count;
3076 oap->oap_brw_flags = brw_flags;
3077 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3078 if (libcfs_memory_pressure_get())
3079 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3080 spin_lock(&oap->oap_lock);
3081 oap->oap_async_flags = async_flags;
3082 spin_unlock(&oap->oap_lock);
3084 if (cmd & OBD_BRW_WRITE)
3085 lop = &loi->loi_write_lop;
3087 lop = &loi->loi_read_lop;
3089 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3090 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3092 rc = oig_add_one(oig, &oap->oap_occ);
3095 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3096 oap, oap->oap_page, rc);
3098 client_obd_list_unlock(&cli->cl_loi_list_lock);
3103 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3104 struct loi_oap_pages *lop, int cmd)
3106 struct list_head *pos, *tmp;
3107 struct osc_async_page *oap;
3109 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3110 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3111 list_del(&oap->oap_pending_item);
3112 osc_oap_to_pending(oap);
3114 loi_list_maint(cli, loi);
3117 static int osc_trigger_group_io(struct obd_export *exp,
3118 struct lov_stripe_md *lsm,
3119 struct lov_oinfo *loi,
3120 struct obd_io_group *oig)
3122 struct client_obd *cli = &exp->exp_obd->u.cli;
3126 loi = lsm->lsm_oinfo[0];
3128 client_obd_list_lock(&cli->cl_loi_list_lock);
3130 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3131 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3133 osc_check_rpcs(cli);
3134 client_obd_list_unlock(&cli->cl_loi_list_lock);
3139 static int osc_teardown_async_page(struct obd_export *exp,
3140 struct lov_stripe_md *lsm,
3141 struct lov_oinfo *loi, void *cookie)
3143 struct client_obd *cli = &exp->exp_obd->u.cli;
3144 struct loi_oap_pages *lop;
3145 struct osc_async_page *oap;
3149 oap = oap_from_cookie(cookie);
3151 RETURN(PTR_ERR(oap));
3154 loi = lsm->lsm_oinfo[0];
3156 if (oap->oap_cmd & OBD_BRW_WRITE) {
3157 lop = &loi->loi_write_lop;
3159 lop = &loi->loi_read_lop;
3162 client_obd_list_lock(&cli->cl_loi_list_lock);
3164 if (!list_empty(&oap->oap_rpc_item))
3165 GOTO(out, rc = -EBUSY);
3167 osc_exit_cache(cli, oap, 0);
3168 osc_wake_cache_waiters(cli);
3170 if (!list_empty(&oap->oap_urgent_item)) {
3171 list_del_init(&oap->oap_urgent_item);
3172 spin_lock(&oap->oap_lock);
3173 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3174 spin_unlock(&oap->oap_lock);
3177 if (!list_empty(&oap->oap_pending_item)) {
3178 list_del_init(&oap->oap_pending_item);
3179 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3181 loi_list_maint(cli, loi);
3182 cache_remove_extent(cli->cl_cache, oap);
3184 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3186 client_obd_list_unlock(&cli->cl_loi_list_lock);
3190 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3191 struct ldlm_lock_desc *new, void *data,
3194 struct lustre_handle lockh = { 0 };
3198 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3199 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3204 case LDLM_CB_BLOCKING:
3205 ldlm_lock2handle(lock, &lockh);
3206 rc = ldlm_cli_cancel(&lockh);
3208 CERROR("ldlm_cli_cancel failed: %d\n", rc);
3210 case LDLM_CB_CANCELING: {
3212 ldlm_lock2handle(lock, &lockh);
3213 /* This lock wasn't granted, don't try to do anything */
3214 if (lock->l_req_mode != lock->l_granted_mode)
3217 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3220 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3221 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3222 lock, new, data,flag);
3231 EXPORT_SYMBOL(osc_extent_blocking_cb);
3233 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3236 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3239 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3242 lock_res_and_lock(lock);
3243 #if defined (__KERNEL__) && defined (__linux__)
3244 /* Liang XXX: Darwin and Winnt checking should be added */
3245 if (lock->l_ast_data && lock->l_ast_data != data) {
3246 struct inode *new_inode = data;
3247 struct inode *old_inode = lock->l_ast_data;
3248 if (!(old_inode->i_state & I_FREEING))
3249 LDLM_ERROR(lock, "inconsistent l_ast_data found");
3250 LASSERTF(old_inode->i_state & I_FREEING,
3251 "Found existing inode %p/%lu/%u state %lu in lock: "
3252 "setting data to %p/%lu/%u\n", old_inode,
3253 old_inode->i_ino, old_inode->i_generation,
3255 new_inode, new_inode->i_ino, new_inode->i_generation);
3258 lock->l_ast_data = data;
3259 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3260 unlock_res_and_lock(lock);
3261 LDLM_LOCK_PUT(lock);
3264 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3265 ldlm_iterator_t replace, void *data)
3267 struct ldlm_res_id res_id;
3268 struct obd_device *obd = class_exp2obd(exp);
3270 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3271 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3275 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3276 struct obd_info *oinfo, int intent, int rc)
3281 /* The request was created before ldlm_cli_enqueue call. */
3282 if (rc == ELDLM_LOCK_ABORTED) {
3283 struct ldlm_reply *rep;
3285 /* swabbed by ldlm_cli_enqueue() */
3286 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3287 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3289 LASSERT(rep != NULL);
3290 if (rep->lock_policy_res1)
3291 rc = rep->lock_policy_res1;
3295 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3296 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3297 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3298 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3299 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3303 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3305 /* Call the update callback. */
3306 rc = oinfo->oi_cb_up(oinfo, rc);
3310 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3313 struct osc_enqueue_args *aa = data;
3314 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3315 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3316 struct ldlm_lock *lock;
3318 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3320 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3322 /* Complete obtaining the lock procedure. */
3323 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3325 &aa->oa_oi->oi_flags,
3326 &lsm->lsm_oinfo[0]->loi_lvb,
3327 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3328 lustre_swab_ost_lvb,
3329 aa->oa_oi->oi_lockh, rc);
3331 /* Complete osc stuff. */
3332 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3334 /* Release the lock for async request. */
3335 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3336 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3338 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3339 aa->oa_oi->oi_lockh, req, aa);
3340 LDLM_LOCK_PUT(lock);
3344 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3345 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3346 * other synchronous requests, however keeping some locks and trying to obtain
3347 * others may take a considerable amount of time in a case of ost failure; and
3348 * when other sync requests do not get released lock from a client, the client
3349 * is excluded from the cluster -- such scenarious make the life difficult, so
3350 * release locks just after they are obtained. */
3351 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3352 struct ldlm_enqueue_info *einfo,
3353 struct ptlrpc_request_set *rqset)
3355 struct ldlm_res_id res_id;
3356 struct obd_device *obd = exp->exp_obd;
3357 struct ldlm_reply *rep;
3358 struct ptlrpc_request *req = NULL;
3359 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3364 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3365 oinfo->oi_md->lsm_object_gr, &res_id);
3366 /* Filesystem lock extents are extended to page boundaries so that
3367 * dealing with the page cache is a little smoother. */
3368 oinfo->oi_policy.l_extent.start -=
3369 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3370 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3372 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3375 /* Next, search for already existing extent locks that will cover us */
3376 /* If we're trying to read, we also search for an existing PW lock. The
3377 * VFS and page cache already protect us locally, so lots of readers/
3378 * writers can share a single PW lock.
3380 * There are problems with conversion deadlocks, so instead of
3381 * converting a read lock to a write lock, we'll just enqueue a new
3384 * At some point we should cancel the read lock instead of making them
3385 * send us a blocking callback, but there are problems with canceling
3386 * locks out from other users right now, too. */
3387 mode = einfo->ei_mode;
3388 if (einfo->ei_mode == LCK_PR)
3390 mode = ldlm_lock_match(obd->obd_namespace,
3391 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3392 einfo->ei_type, &oinfo->oi_policy, mode,
3395 /* addref the lock only if not async requests and PW lock is
3396 * matched whereas we asked for PR. */
3397 if (!rqset && einfo->ei_mode != mode)
3398 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3399 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3402 /* I would like to be able to ASSERT here that rss <=
3403 * kms, but I can't, for reasons which are explained in
3407 /* We already have a lock, and it's referenced */
3408 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3410 /* For async requests, decref the lock. */
3411 if (einfo->ei_mode != mode)
3412 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3414 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3422 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3423 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
3424 [DLM_LOCKREQ_OFF + 1] = 0 };
3426 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3430 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3431 size[DLM_REPLY_REC_OFF] =
3432 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3433 ptlrpc_req_set_repsize(req, 3, size);
3436 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3437 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3439 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3440 &oinfo->oi_policy, &oinfo->oi_flags,
3441 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3442 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3443 lustre_swab_ost_lvb, oinfo->oi_lockh,
3447 struct osc_enqueue_args *aa;
3448 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3449 aa = ptlrpc_req_async_args(req);
3454 req->rq_interpret_reply = osc_enqueue_interpret;
3455 ptlrpc_set_add_req(rqset, req);
3456 } else if (intent) {
3457 ptlrpc_req_finished(req);
3462 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3464 ptlrpc_req_finished(req);
3469 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3470 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3471 int *flags, void *data, struct lustre_handle *lockh,
3474 struct ldlm_res_id res_id;
3475 struct obd_device *obd = exp->exp_obd;
3476 int lflags = *flags;
3480 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3482 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3484 /* Filesystem lock extents are extended to page boundaries so that
3485 * dealing with the page cache is a little smoother */
3486 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3487 policy->l_extent.end |= ~CFS_PAGE_MASK;
3489 /* Next, search for already existing extent locks that will cover us */
3490 /* If we're trying to read, we also search for an existing PW lock. The
3491 * VFS and page cache already protect us locally, so lots of readers/
3492 * writers can share a single PW lock. */
3496 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3497 &res_id, type, policy, rc, lockh);
3499 osc_set_data_with_check(lockh, data, lflags);
3500 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3501 ldlm_lock_addref(lockh, LCK_PR);
3502 ldlm_lock_decref(lockh, LCK_PW);
3504 if (n_matches != NULL)
3511 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3512 __u32 mode, struct lustre_handle *lockh, int flags,
3517 if (unlikely(mode == LCK_GROUP))
3518 ldlm_lock_decref_and_cancel(lockh, mode);
3520 ldlm_lock_decref(lockh, mode);
3525 static int osc_cancel_unused(struct obd_export *exp,
3526 struct lov_stripe_md *lsm, int flags, void *opaque)
3528 struct obd_device *obd = class_exp2obd(exp);
3529 struct ldlm_res_id res_id, *resp = NULL;
3532 resp = osc_build_res_name(lsm->lsm_object_id,
3533 lsm->lsm_object_gr, &res_id);
3536 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3540 static int osc_join_lru(struct obd_export *exp,
3541 struct lov_stripe_md *lsm, int join)
3543 struct obd_device *obd = class_exp2obd(exp);
3544 struct ldlm_res_id res_id, *resp = NULL;
3547 resp = osc_build_res_name(lsm->lsm_object_id,
3548 lsm->lsm_object_gr, &res_id);
3551 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3555 static int osc_statfs_interpret(struct ptlrpc_request *req,
3558 struct osc_async_args *aa = data;
3559 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3560 struct obd_statfs *msfs;
3565 /* The request has in fact never been sent
3566 * due to issues at a higher level (LOV).
3567 * Exit immediately since the caller is
3568 * aware of the problem and takes care
3569 * of the clean up */
3572 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3573 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3579 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3580 lustre_swab_obd_statfs);
3582 CERROR("Can't unpack obd_statfs\n");
3583 GOTO(out, rc = -EPROTO);
3586 /* Reinitialize the RDONLY and DEGRADED flags at the client
3587 * on each statfs, so they don't stay set permanently. */
3588 spin_lock(&cli->cl_oscc.oscc_lock);
3590 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3591 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3592 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3593 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3595 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3596 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3597 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3598 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3600 /* Add a bit of hysteresis so this flag isn't continually flapping,
3601 * and ensure that new files don't get extremely fragmented due to
3602 * only a small amount of available space in the filesystem.
3603 * We want to set the NOSPC flag when there is less than ~0.1% free
3604 * and clear it when there is at least ~0.2% free space, so:
3605 * avail < ~0.1% max max = avail + used
3606 * 1025 * avail < avail + used used = blocks - free
3607 * 1024 * avail < used
3608 * 1024 * avail < blocks - free
3609 * avail < ((blocks - free) >> 10)
3611 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3612 * lose that amount of space so in those cases we report no space left
3613 * if their is less than 1 GB left. */
3614 used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3615 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3616 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3617 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3618 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3619 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3620 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3622 spin_unlock(&cli->cl_oscc.oscc_lock);
3624 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3626 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3630 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3631 __u64 max_age, struct ptlrpc_request_set *rqset)
3633 struct ptlrpc_request *req;
3634 struct osc_async_args *aa;
3635 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3638 /* We could possibly pass max_age in the request (as an absolute
3639 * timestamp or a "seconds.usec ago") so the target can avoid doing
3640 * extra calls into the filesystem if that isn't necessary (e.g.
3641 * during mount that would help a bit). Having relative timestamps
3642 * is not so great if request processing is slow, while absolute
3643 * timestamps are not ideal because they need time synchronization. */
3644 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3645 OST_STATFS, 1, NULL, NULL);
3649 ptlrpc_req_set_repsize(req, 2, size);
3650 req->rq_request_portal = OST_CREATE_PORTAL;
3651 ptlrpc_at_set_req_timeout(req);
3652 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3653 /* procfs requests not want stat in wait for avoid deadlock */
3654 req->rq_no_resend = 1;
3655 req->rq_no_delay = 1;
3658 req->rq_interpret_reply = osc_statfs_interpret;
3659 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3660 aa = ptlrpc_req_async_args(req);
3663 ptlrpc_set_add_req(rqset, req);
3667 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3668 __u64 max_age, __u32 flags)
3670 struct obd_statfs *msfs;
3671 struct ptlrpc_request *req;
3672 struct obd_import *imp = NULL;
3673 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3677 /*Since the request might also come from lprocfs, so we need
3678 *sync this with client_disconnect_export Bug15684*/
3679 down_read(&obd->u.cli.cl_sem);
3680 if (obd->u.cli.cl_import)
3681 imp = class_import_get(obd->u.cli.cl_import);
3682 up_read(&obd->u.cli.cl_sem);
3686 /* We could possibly pass max_age in the request (as an absolute
3687 * timestamp or a "seconds.usec ago") so the target can avoid doing
3688 * extra calls into the filesystem if that isn't necessary (e.g.
3689 * during mount that would help a bit). Having relative timestamps
3690 * is not so great if request processing is slow, while absolute
3691 * timestamps are not ideal because they need time synchronization. */
3692 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3693 OST_STATFS, 1, NULL, NULL);
3695 class_import_put(imp);
3699 ptlrpc_req_set_repsize(req, 2, size);
3700 req->rq_request_portal = OST_CREATE_PORTAL;
3701 ptlrpc_at_set_req_timeout(req);
3703 if (flags & OBD_STATFS_NODELAY) {
3704 /* procfs requests not want stat in wait for avoid deadlock */
3705 req->rq_no_resend = 1;
3706 req->rq_no_delay = 1;
3709 rc = ptlrpc_queue_wait(req);
3713 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3714 lustre_swab_obd_statfs);
3716 CERROR("Can't unpack obd_statfs\n");
3717 GOTO(out, rc = -EPROTO);
3720 memcpy(osfs, msfs, sizeof(*osfs));
3724 ptlrpc_req_finished(req);
3728 /* Retrieve object striping information.
3730 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3731 * the maximum number of OST indices which will fit in the user buffer.
3732 * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3734 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3736 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3737 struct lov_user_md_v3 lum, *lumk;
3738 int rc = 0, lum_size;
3739 struct lov_user_ost_data_v1 *lmm_objects;
3745 /* we only need the header part from user space to get lmm_magic and
3746 * lmm_stripe_count, (the header part is common to v1 and v3) */
3747 lum_size = sizeof(struct lov_user_md_v1);
3748 memset(&lum, 0x00, sizeof(lum));
3749 if (copy_from_user(&lum, lump, lum_size))
3752 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3753 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3756 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3757 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3758 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3759 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3761 /* we can use lov_mds_md_size() to compute lum_size
3762 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3763 if (lum.lmm_stripe_count > 0) {
3764 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3765 OBD_ALLOC(lumk, lum_size);
3768 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3769 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3771 lmm_objects = &(lumk->lmm_objects[0]);
3772 lmm_objects->l_object_id = lsm->lsm_object_id;
3774 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3778 lumk->lmm_magic = lum.lmm_magic;
3779 lumk->lmm_stripe_count = 1;
3780 lumk->lmm_object_id = lsm->lsm_object_id;
3782 if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3783 (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3784 /* lsm not in host order, so count also need be in same order */
3785 __swab32s(&lumk->lmm_magic);
3786 __swab16s(&lumk->lmm_stripe_count);
3787 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3788 if (lum.lmm_stripe_count > 0)
3789 lustre_swab_lov_user_md_objects(
3790 (struct lov_user_md_v1*)lumk);
3793 if (copy_to_user(lump, lumk, lum_size))
3797 OBD_FREE(lumk, lum_size);
3803 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3804 void *karg, void *uarg)
3806 struct obd_device *obd = exp->exp_obd;
3807 struct obd_ioctl_data *data = karg;
3811 if (!try_module_get(THIS_MODULE)) {
3812 CERROR("Can't get module. Is it alive?");
3816 case OBD_IOC_LOV_GET_CONFIG: {
3818 struct lov_desc *desc;
3819 struct obd_uuid uuid;
3823 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3824 GOTO(out, err = -EINVAL);
3826 data = (struct obd_ioctl_data *)buf;
3828 if (sizeof(*desc) > data->ioc_inllen1) {
3829 obd_ioctl_freedata(buf, len);
3830 GOTO(out, err = -EINVAL);
3833 if (data->ioc_inllen2 < sizeof(uuid)) {
3834 obd_ioctl_freedata(buf, len);
3835 GOTO(out, err = -EINVAL);
3838 desc = (struct lov_desc *)data->ioc_inlbuf1;
3839 desc->ld_tgt_count = 1;
3840 desc->ld_active_tgt_count = 1;
3841 desc->ld_default_stripe_count = 1;
3842 desc->ld_default_stripe_size = 0;
3843 desc->ld_default_stripe_offset = 0;
3844 desc->ld_pattern = 0;
3845 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3847 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3849 err = copy_to_user((void *)uarg, buf, len);
3852 obd_ioctl_freedata(buf, len);
3855 case LL_IOC_LOV_SETSTRIPE:
3856 err = obd_alloc_memmd(exp, karg);
3860 case LL_IOC_LOV_GETSTRIPE:
3861 err = osc_getstripe(karg, uarg);
3863 case OBD_IOC_CLIENT_RECOVER:
3864 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3869 case IOC_OSC_SET_ACTIVE:
3870 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3873 case OBD_IOC_POLL_QUOTACHECK:
3874 err = lquota_poll_check(quota_interface, exp,
3875 (struct if_quotacheck *)karg);
3877 case OBD_IOC_DESTROY: {
3880 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3881 GOTO (out, err = -EPERM);
3882 oa = &data->ioc_obdo1;
3885 GOTO(out, err = -EINVAL);
3887 oa->o_valid |= OBD_MD_FLGROUP;
3889 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3892 case OBD_IOC_PING_TARGET:
3893 err = ptlrpc_obd_ping(obd);
3896 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3897 cmd, cfs_curproc_comm());
3898 GOTO(out, err = -ENOTTY);
3901 module_put(THIS_MODULE);
3905 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3906 void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3909 if (!vallen || !val)
3912 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3913 __u32 *stripe = val;
3914 *vallen = sizeof(*stripe);
3917 } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
3918 struct client_obd *cli = &exp->exp_obd->u.cli;
3919 __u64 *rpcsize = val;
3920 LASSERT(*vallen == sizeof(__u64));
3921 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
3923 } else if (KEY_IS(KEY_LAST_ID)) {
3924 struct ptlrpc_request *req;
3926 char *bufs[2] = { NULL, key };
3927 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3930 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3931 OST_GET_INFO, 2, size, bufs);
3935 size[REPLY_REC_OFF] = *vallen;
3936 ptlrpc_req_set_repsize(req, 2, size);
3937 rc = ptlrpc_queue_wait(req);
3941 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3942 lustre_swab_ost_last_id);
3943 if (reply == NULL) {
3944 CERROR("Can't unpack OST last ID\n");
3945 GOTO(out, rc = -EPROTO);
3947 *((obd_id *)val) = *reply;
3949 ptlrpc_req_finished(req);
3951 } else if (KEY_IS(KEY_FIEMAP)) {
3952 struct ptlrpc_request *req;
3953 struct ll_user_fiemap *reply;
3954 char *bufs[2] = { NULL, key };
3955 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3958 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3959 OST_GET_INFO, 2, size, bufs);
3963 size[REPLY_REC_OFF] = *vallen;
3964 ptlrpc_req_set_repsize(req, 2, size);
3966 rc = ptlrpc_queue_wait(req);
3969 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3970 lustre_swab_fiemap);
3971 if (reply == NULL) {
3972 CERROR("Can't unpack FIEMAP reply.\n");
3973 GOTO(out1, rc = -EPROTO);
3976 memcpy(val, reply, *vallen);
3979 ptlrpc_req_finished(req);
3987 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3990 struct llog_ctxt *ctxt;
3991 struct obd_import *imp = req->rq_import;
3997 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4000 rc = llog_initiator_connect(ctxt);
4002 CERROR("cannot establish connection for "
4003 "ctxt %p: %d\n", ctxt, rc);
4006 llog_ctxt_put(ctxt);
4007 spin_lock(&imp->imp_lock);
4008 imp->imp_server_timeout = 1;
4009 imp->imp_pingable = 1;
4010 spin_unlock(&imp->imp_lock);
4011 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4016 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4017 void *key, obd_count vallen, void *val,
4018 struct ptlrpc_request_set *set)
4020 struct ptlrpc_request *req;
4021 struct obd_device *obd = exp->exp_obd;
4022 struct obd_import *imp = class_exp2cliimp(exp);
4023 __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
4024 char *bufs[3] = { NULL, key, val };
4027 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4029 if (KEY_IS(KEY_NEXT_ID)) {
4031 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4033 if (vallen != sizeof(obd_id))
4036 /* avoid race between allocate new object and set next id
4037 * from ll_sync thread */
4038 spin_lock(&oscc->oscc_lock);
4039 new_val = *((obd_id*)val) + 1;
4040 if (new_val > oscc->oscc_next_id)
4041 oscc->oscc_next_id = new_val;
4042 spin_unlock(&oscc->oscc_lock);
4044 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4045 exp->exp_obd->obd_name,
4046 oscc->oscc_next_id);
4051 if (KEY_IS(KEY_INIT_RECOV)) {
4052 if (vallen != sizeof(int))
4054 spin_lock(&imp->imp_lock);
4055 imp->imp_initial_recov = *(int *)val;
4056 spin_unlock(&imp->imp_lock);
4057 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4058 exp->exp_obd->obd_name,
4059 imp->imp_initial_recov);
4063 if (KEY_IS(KEY_CHECKSUM)) {
4064 if (vallen != sizeof(int))
4066 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4070 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4073 /* We pass all other commands directly to OST. Since nobody calls osc
4074 methods directly and everybody is supposed to go through LOV, we
4075 assume lov checked invalid values for us.
4076 The only recognised values so far are evict_by_nid and mds_conn.
4077 Even if something bad goes through, we'd get a -EINVAL from OST
4080 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4085 if (KEY_IS(KEY_MDS_CONN))
4086 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4087 else if (KEY_IS(KEY_GRANT_SHRINK))
4088 req->rq_interpret_reply = osc_shrink_grant_interpret;
4090 if (KEY_IS(KEY_GRANT_SHRINK)) {
4091 struct osc_grant_args *aa;
4094 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4095 aa = ptlrpc_req_async_args(req);
4098 ptlrpc_req_finished(req);
4101 *oa = ((struct ost_body *)val)->oa;
4105 ptlrpc_req_set_repsize(req, 2, size);
4106 ptlrpcd_add_req(req);
4108 ptlrpc_req_set_repsize(req, 1, NULL);
4109 ptlrpc_set_add_req(set, req);
4110 ptlrpc_check_set(set);
4117 static struct llog_operations osc_size_repl_logops = {
4118 lop_cancel: llog_obd_repl_cancel
4121 static struct llog_operations osc_mds_ost_orig_logops;
4122 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4125 struct llog_catid catid;
4126 static char name[32] = CATLIST;
4132 mutex_down(&disk_obd->obd_llog_cat_process);
4134 rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4136 CERROR("rc: %d\n", rc);
4137 GOTO(out_unlock, rc);
4140 CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4141 obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4142 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4145 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4146 &catid.lci_logid, &osc_mds_ost_orig_logops);
4148 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4152 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4153 &osc_size_repl_logops);
4155 struct llog_ctxt *ctxt =
4156 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4159 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4163 CERROR("osc '%s' tgt '%s' rc=%d\n",
4164 obd->obd_name, disk_obd->obd_name, rc);
4165 CERROR("logid "LPX64":0x%x\n",
4166 catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4168 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4171 CERROR("rc: %d\n", rc);
4174 mutex_up(&disk_obd->obd_llog_cat_process);
4179 static int osc_llog_finish(struct obd_device *obd, int count)
4181 struct llog_ctxt *ctxt;
4182 int rc = 0, rc2 = 0;
4185 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4187 rc = llog_cleanup(ctxt);
4189 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4191 rc2 = llog_cleanup(ctxt);
4198 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4199 struct obd_uuid *cluuid,
4200 struct obd_connect_data *data,
4203 struct client_obd *cli = &obd->u.cli;
4205 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4208 client_obd_list_lock(&cli->cl_loi_list_lock);
4209 data->ocd_grant = cli->cl_avail_grant ?:
4210 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4211 lost_grant = cli->cl_lost_grant;
4212 cli->cl_lost_grant = 0;
4213 client_obd_list_unlock(&cli->cl_loi_list_lock);
4215 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4216 "cl_lost_grant: %ld\n", data->ocd_grant,
4217 cli->cl_avail_grant, lost_grant);
4218 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4219 " ocd_grant: %d\n", data->ocd_connect_flags,
4220 data->ocd_version, data->ocd_grant);
4226 static int osc_disconnect(struct obd_export *exp)
4228 struct obd_device *obd = class_exp2obd(exp);
4229 struct llog_ctxt *ctxt;
4232 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4234 if (obd->u.cli.cl_conn_count == 1) {
4235 /* Flush any remaining cancel messages out to the
4237 llog_sync(ctxt, exp);
4239 llog_ctxt_put(ctxt);
4241 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4245 rc = client_disconnect_export(exp);
4247 * Initially we put del_shrink_grant before disconnect_export, but it
4248 * causes the following problem if setup (connect) and cleanup
4249 * (disconnect) are tangled together.
4250 * connect p1 disconnect p2
4251 * ptlrpc_connect_import
4252 * ............... class_manual_cleanup
4255 * ptlrpc_connect_interrupt
4257 * add this client to shrink list
4259 * Bang! pinger trigger the shrink.
4260 * So the osc should be disconnected from the shrink list, after we
4261 * are sure the import has been destroyed. BUG18662
4263 if (obd->u.cli.cl_import == NULL)
4264 osc_del_shrink_grant(&obd->u.cli);
4268 static int osc_import_event(struct obd_device *obd,
4269 struct obd_import *imp,
4270 enum obd_import_event event)
4272 struct client_obd *cli;
4276 LASSERT(imp->imp_obd == obd);
4279 case IMP_EVENT_DISCON: {
4280 /* Only do this on the MDS OSC's */
4281 if (imp->imp_server_timeout) {
4282 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4284 spin_lock(&oscc->oscc_lock);
4285 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4286 spin_unlock(&oscc->oscc_lock);
4289 client_obd_list_lock(&cli->cl_loi_list_lock);
4290 cli->cl_avail_grant = 0;
4291 cli->cl_lost_grant = 0;
4292 client_obd_list_unlock(&cli->cl_loi_list_lock);
4293 ptlrpc_import_setasync(imp, -1);
4297 case IMP_EVENT_INACTIVE: {
4298 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4301 case IMP_EVENT_INVALIDATE: {
4302 struct ldlm_namespace *ns = obd->obd_namespace;
4306 client_obd_list_lock(&cli->cl_loi_list_lock);
4307 /* all pages go to failing rpcs due to the invalid import */
4308 osc_check_rpcs(cli);
4309 client_obd_list_unlock(&cli->cl_loi_list_lock);
4311 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4315 case IMP_EVENT_ACTIVE: {
4316 /* Only do this on the MDS OSC's */
4317 if (imp->imp_server_timeout) {
4318 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4320 spin_lock(&oscc->oscc_lock);
4321 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4322 spin_unlock(&oscc->oscc_lock);
4324 CDEBUG(D_INFO, "notify server \n");
4325 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4328 case IMP_EVENT_OCD: {
4329 struct obd_connect_data *ocd = &imp->imp_connect_data;
4331 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4332 osc_init_grant(&obd->u.cli, ocd);
4335 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4336 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4338 ptlrpc_import_setasync(imp, 1);
4339 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4343 CERROR("Unknown import event %d\n", event);
4349 /* determine whether the lock can be canceled before replaying the lock
4350 * during recovery, see bug16774 for detailed information
4353 * zero - the lock can't be canceled
4354 * other - ok to cancel
4356 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4358 check_res_locked(lock->l_resource);
4359 if (lock->l_granted_mode == LCK_GROUP ||
4360 lock->l_resource->lr_type != LDLM_EXTENT)
4363 /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
4364 if (lock->l_granted_mode == LCK_PR ||
4365 lock->l_granted_mode == LCK_CR)
4371 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4377 rc = ptlrpcd_addref();
4381 rc = client_obd_setup(obd, len, buf);
4385 struct lprocfs_static_vars lvars = { 0 };
4386 struct client_obd *cli = &obd->u.cli;
4388 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4389 lprocfs_osc_init_vars(&lvars);
4390 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4391 lproc_osc_attach_seqstat(obd);
4392 ptlrpc_lprocfs_register_obd(obd);
4396 /* We need to allocate a few requests more, because
4397 brw_interpret tries to create new requests before freeing
4398 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4399 reserved, but I afraid that might be too much wasted RAM
4400 in fact, so 2 is just my guess and still should work. */
4401 cli->cl_import->imp_rq_pool =
4402 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4404 ptlrpc_add_rqs_to_pool);
4405 cli->cl_cache = cache_create(obd);
4406 if (!cli->cl_cache) {
4410 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4411 sema_init(&cli->cl_grant_sem, 1);
4413 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4419 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4425 case OBD_CLEANUP_EARLY: {
4426 struct obd_import *imp;
4427 imp = obd->u.cli.cl_import;
4428 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4429 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4430 ptlrpc_deactivate_import(imp);
4433 case OBD_CLEANUP_EXPORTS: {
4434 /* If we set up but never connected, the
4435 client import will not have been cleaned. */
4436 down_write(&obd->u.cli.cl_sem);
4437 if (obd->u.cli.cl_import) {
4438 struct obd_import *imp;
4439 imp = obd->u.cli.cl_import;
4440 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4442 ptlrpc_invalidate_import(imp);
4443 if (imp->imp_rq_pool) {
4444 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4445 imp->imp_rq_pool = NULL;
4447 class_destroy_import(imp);
4448 obd->u.cli.cl_import = NULL;
4450 up_write(&obd->u.cli.cl_sem);
4452 rc = obd_llog_finish(obd, 0);
4454 CERROR("failed to cleanup llogging subsystems\n");
4457 case OBD_CLEANUP_SELF_EXP:
4459 case OBD_CLEANUP_OBD:
4465 int osc_cleanup(struct obd_device *obd)
4470 ptlrpc_lprocfs_unregister_obd(obd);
4471 lprocfs_obd_cleanup(obd);
4473 /* free memory of osc quota cache */
4474 lquota_cleanup(quota_interface, obd);
4476 cache_destroy(obd->u.cli.cl_cache);
4477 rc = client_obd_cleanup(obd);
4483 static int osc_register_page_removal_cb(struct obd_device *obd,
4484 obd_page_removal_cb_t func,
4485 obd_pin_extent_cb pin_cb)
4489 /* this server - not need init */
4493 return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4497 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4498 obd_page_removal_cb_t func)
4501 return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4504 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4505 obd_lock_cancel_cb cb)
4508 LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4510 /* this server - not need init */
4514 obd->u.cli.cl_ext_lock_cancel_cb = cb;
4518 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4519 obd_lock_cancel_cb cb)
4523 if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4524 CERROR("Unregistering cancel cb %p, while only %p was "
4526 obd->u.cli.cl_ext_lock_cancel_cb);
4530 obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4534 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4536 struct lustre_cfg *lcfg = buf;
4537 struct lprocfs_static_vars lvars = { 0 };
4540 lprocfs_osc_init_vars(&lvars);
4542 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4546 struct obd_ops osc_obd_ops = {
4547 .o_owner = THIS_MODULE,
4548 .o_setup = osc_setup,
4549 .o_precleanup = osc_precleanup,
4550 .o_cleanup = osc_cleanup,
4551 .o_add_conn = client_import_add_conn,
4552 .o_del_conn = client_import_del_conn,
4553 .o_connect = client_connect_import,
4554 .o_reconnect = osc_reconnect,
4555 .o_disconnect = osc_disconnect,
4556 .o_statfs = osc_statfs,
4557 .o_statfs_async = osc_statfs_async,
4558 .o_packmd = osc_packmd,
4559 .o_unpackmd = osc_unpackmd,
4560 .o_precreate = osc_precreate,
4561 .o_create = osc_create,
4562 .o_create_async = osc_create_async,
4563 .o_destroy = osc_destroy,
4564 .o_getattr = osc_getattr,
4565 .o_getattr_async = osc_getattr_async,
4566 .o_setattr = osc_setattr,
4567 .o_setattr_async = osc_setattr_async,
4569 .o_brw_async = osc_brw_async,
4570 .o_prep_async_page = osc_prep_async_page,
4571 .o_get_lock = osc_get_lock,
4572 .o_queue_async_io = osc_queue_async_io,
4573 .o_set_async_flags = osc_set_async_flags,
4574 .o_queue_group_io = osc_queue_group_io,
4575 .o_trigger_group_io = osc_trigger_group_io,
4576 .o_teardown_async_page = osc_teardown_async_page,
4577 .o_punch = osc_punch,
4579 .o_enqueue = osc_enqueue,
4580 .o_match = osc_match,
4581 .o_change_cbdata = osc_change_cbdata,
4582 .o_cancel = osc_cancel,
4583 .o_cancel_unused = osc_cancel_unused,
4584 .o_join_lru = osc_join_lru,
4585 .o_iocontrol = osc_iocontrol,
4586 .o_get_info = osc_get_info,
4587 .o_set_info_async = osc_set_info_async,
4588 .o_import_event = osc_import_event,
4589 .o_llog_init = osc_llog_init,
4590 .o_llog_finish = osc_llog_finish,
4591 .o_process_config = osc_process_config,
4592 .o_register_page_removal_cb = osc_register_page_removal_cb,
4593 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4594 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4595 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4597 int __init osc_init(void)
4599 struct lprocfs_static_vars lvars = { 0 };
4603 lprocfs_osc_init_vars(&lvars);
4605 request_module("lquota");
4606 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4607 lquota_init(quota_interface);
4608 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4610 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4613 if (quota_interface)
4614 PORTAL_SYMBOL_PUT(osc_quota_interface);
4618 osc_mds_ost_orig_logops = llog_lvfs_ops;
4619 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4620 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4621 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4622 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4628 static void /*__exit*/ osc_exit(void)
4630 lquota_exit(quota_interface);
4631 if (quota_interface)
4632 PORTAL_SYMBOL_PUT(osc_quota_interface);
4634 class_unregister_type(LUSTRE_OSC_NAME);
4637 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4638 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4639 MODULE_LICENSE("GPL");
4641 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);