1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
78 atomic_t osc_resend_time;
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82 struct lov_stripe_md *lsm)
87 lmm_size = sizeof(**lmmp);
92 OBD_FREE(*lmmp, lmm_size);
98 OBD_ALLOC(*lmmp, lmm_size);
104 LASSERT(lsm->lsm_object_id);
105 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113 struct lov_mds_md *lmm, int lmm_bytes)
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == 0) {
127 CERROR("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
136 if (*lsmp != NULL && lmm == NULL) {
137 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 OBD_FREE(*lsmp, lsm_size);
144 OBD_ALLOC(*lsmp, lsm_size);
147 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149 OBD_FREE(*lsmp, lsm_size);
152 loi_init((*lsmp)->lsm_oinfo[0]);
156 /* XXX zero *lsmp? */
157 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158 LASSERT((*lsmp)->lsm_object_id);
161 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
169 struct ost_body *body;
170 struct osc_async_args *aa = data;
176 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
177 lustre_swab_ost_body);
179 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
182 /* This should really be sent by the OST */
183 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
184 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
186 CERROR("can't unpack ost_body\n");
188 aa->aa_oi->oi_oa->o_valid = 0;
191 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
195 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
196 struct ptlrpc_request_set *set)
198 struct ptlrpc_request *req;
199 struct ost_body *body;
200 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
201 struct osc_async_args *aa;
204 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
205 OST_GETATTR, 2, size,NULL);
209 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
210 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
212 ptlrpc_req_set_repsize(req, 2, size);
213 req->rq_interpret_reply = osc_getattr_interpret;
215 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
216 aa = ptlrpc_req_async_args(req);
219 ptlrpc_set_add_req(set, req);
223 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
225 struct ptlrpc_request *req;
226 struct ost_body *body;
227 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
231 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
232 OST_GETATTR, 2, size, NULL);
236 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
237 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
239 ptlrpc_req_set_repsize(req, 2, size);
241 rc = ptlrpc_queue_wait(req);
243 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
247 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
248 lustre_swab_ost_body);
250 CERROR ("can't unpack ost_body\n");
251 GOTO (out, rc = -EPROTO);
254 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
257 /* This should really be sent by the OST */
258 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
259 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
263 ptlrpc_req_finished(req);
267 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
268 struct obd_trans_info *oti)
270 struct ptlrpc_request *req;
271 struct ost_body *body;
272 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
276 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
277 OST_SETATTR, 2, size, NULL);
281 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
282 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
284 ptlrpc_req_set_repsize(req, 2, size);
286 rc = ptlrpc_queue_wait(req);
290 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
291 lustre_swab_ost_body);
293 GOTO(out, rc = -EPROTO);
295 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
299 ptlrpc_req_finished(req);
303 static int osc_setattr_interpret(struct ptlrpc_request *req,
306 struct ost_body *body;
307 struct osc_async_args *aa = data;
313 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
314 lustre_swab_ost_body);
316 CERROR("can't unpack ost_body\n");
317 GOTO(out, rc = -EPROTO);
320 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
322 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
326 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
327 struct obd_trans_info *oti,
328 struct ptlrpc_request_set *rqset)
330 struct ptlrpc_request *req;
331 struct ost_body *body;
332 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
334 struct osc_async_args *aa;
337 if (osc_exp_is_2_0_server(exp)) {
341 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
342 OST_SETATTR, bufcount, size, NULL);
346 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
348 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
350 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
353 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
354 ptlrpc_req_set_repsize(req, 2, size);
355 /* do mds to ost setattr asynchronouly */
357 /* Do not wait for response. */
358 ptlrpcd_add_req(req);
360 req->rq_interpret_reply = osc_setattr_interpret;
362 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
363 aa = ptlrpc_req_async_args(req);
366 ptlrpc_set_add_req(rqset, req);
372 int osc_real_create(struct obd_export *exp, struct obdo *oa,
373 struct lov_stripe_md **ea, struct obd_trans_info *oti)
375 struct ptlrpc_request *req;
376 struct ost_body *body;
377 struct lov_stripe_md *lsm;
378 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
387 rc = obd_alloc_memmd(exp, &lsm);
392 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
393 OST_CREATE, 2, size, NULL);
395 GOTO(out, rc = -ENOMEM);
397 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
398 lustre_set_wire_obdo(&body->oa, oa);
400 ptlrpc_req_set_repsize(req, 2, size);
401 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
402 oa->o_flags == OBD_FL_DELORPHAN) {
404 "delorphan from OST integration");
405 /* Don't resend the delorphan req */
406 req->rq_no_resend = req->rq_no_delay = 1;
409 rc = ptlrpc_queue_wait(req);
413 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
414 lustre_swab_ost_body);
416 CERROR ("can't unpack ost_body\n");
417 GOTO (out_req, rc = -EPROTO);
420 lustre_get_wire_obdo(oa, &body->oa);
422 /* This should really be sent by the OST */
423 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
424 oa->o_valid |= OBD_MD_FLBLKSZ;
426 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
427 * have valid lsm_oinfo data structs, so don't go touching that.
428 * This needs to be fixed in a big way.
430 lsm->lsm_object_id = oa->o_id;
434 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
436 if (oa->o_valid & OBD_MD_FLCOOKIE) {
437 if (!oti->oti_logcookies)
438 oti_alloc_cookies(oti, 1);
439 *oti->oti_logcookies = oa->o_lcookie;
443 CDEBUG(D_HA, "transno: "LPD64"\n",
444 lustre_msg_get_transno(req->rq_repmsg));
446 ptlrpc_req_finished(req);
449 obd_free_memmd(exp, &lsm);
453 static int osc_punch_interpret(struct ptlrpc_request *req,
456 struct ost_body *body;
457 struct osc_async_args *aa = data;
463 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
464 lustre_swab_ost_body);
466 CERROR ("can't unpack ost_body\n");
467 GOTO(out, rc = -EPROTO);
470 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
472 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
476 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
477 struct obd_trans_info *oti,
478 struct ptlrpc_request_set *rqset)
480 struct ptlrpc_request *req;
481 struct osc_async_args *aa;
482 struct ost_body *body;
483 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
492 OST_PUNCH, 2, size, NULL);
496 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
497 ptlrpc_at_set_req_timeout(req);
499 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
500 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
502 /* overload the size and blocks fields in the oa with start/end */
503 body->oa.o_size = oinfo->oi_policy.l_extent.start;
504 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
505 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
507 ptlrpc_req_set_repsize(req, 2, size);
509 req->rq_interpret_reply = osc_punch_interpret;
510 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
511 aa = ptlrpc_req_async_args(req);
513 ptlrpc_set_add_req(rqset, req);
518 static int osc_sync_interpret(struct ptlrpc_request *req,
521 struct ost_body *body;
522 struct osc_async_args *aa = data;
528 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529 lustre_swab_ost_body);
531 CERROR ("can't unpack ost_body\n");
532 GOTO(out, rc = -EPROTO);
535 *aa->aa_oi->oi_oa = body->oa;
537 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
541 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
542 obd_size start, obd_size end,
543 struct ptlrpc_request_set *set)
545 struct ptlrpc_request *req;
546 struct ost_body *body;
547 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
548 struct osc_async_args *aa;
556 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
557 OST_SYNC, 2, size, NULL);
561 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
562 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
564 /* overload the size and blocks fields in the oa with start/end */
565 body->oa.o_size = start;
566 body->oa.o_blocks = end;
567 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
569 ptlrpc_req_set_repsize(req, 2, size);
570 req->rq_interpret_reply = osc_sync_interpret;
572 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
573 aa = ptlrpc_req_async_args(req);
576 ptlrpc_set_add_req(set, req);
580 /* Find and cancel locally locks matched by @mode in the resource found by
581 * @objid. Found locks are added into @cancel list. Returns the amount of
582 * locks added to @cancels list. */
583 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
584 struct list_head *cancels, ldlm_mode_t mode,
587 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
588 struct ldlm_res_id res_id;
589 struct ldlm_resource *res;
593 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
594 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
598 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
599 lock_flags, 0, NULL);
600 ldlm_resource_putref(res);
604 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
607 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
609 atomic_dec(&cli->cl_destroy_in_flight);
610 cfs_waitq_signal(&cli->cl_destroy_waitq);
614 static int osc_can_send_destroy(struct client_obd *cli)
616 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617 cli->cl_max_rpcs_in_flight) {
618 /* The destroy request can be sent */
621 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622 cli->cl_max_rpcs_in_flight) {
624 * The counter has been modified between the two atomic
627 cfs_waitq_signal(&cli->cl_destroy_waitq);
632 /* Destroy requests can be async always on the client, and we don't even really
633 * care about the return code since the client cannot do anything at all about
635 * When the MDS is unlinking a filename, it saves the file objects into a
636 * recovery llog, and these object records are cancelled when the OST reports
637 * they were destroyed and sync'd to disk (i.e. transaction committed).
638 * If the client dies, or the OST is down when the object should be destroyed,
639 * the records are not cancelled, and when the OST reconnects to the MDS next,
640 * it will retrieve the llog unlink logs and then sends the log cancellation
641 * cookies to the MDS after committing destroy transactions. */
642 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
643 struct lov_stripe_md *ea, struct obd_trans_info *oti,
644 struct obd_export *md_export)
646 CFS_LIST_HEAD(cancels);
647 struct ptlrpc_request *req;
648 struct ost_body *body;
649 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
650 sizeof(struct ldlm_request) };
651 int count, bufcount = 2;
652 struct client_obd *cli = &exp->exp_obd->u.cli;
660 LASSERT(oa->o_id != 0);
662 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
663 LDLM_FL_DISCARD_DATA);
664 if (exp_connect_cancelset(exp))
666 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
667 size, REQ_REC_OFF + 1, 0, &cancels, count);
671 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
672 ptlrpc_at_set_req_timeout(req);
674 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
676 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
677 oa->o_lcookie = *oti->oti_logcookies;
680 lustre_set_wire_obdo(&body->oa, oa);
681 ptlrpc_req_set_repsize(req, 2, size);
683 /* don't throttle destroy RPCs for the MDT */
684 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
685 req->rq_interpret_reply = osc_destroy_interpret;
686 if (!osc_can_send_destroy(cli)) {
687 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
691 * Wait until the number of on-going destroy RPCs drops
692 * under max_rpc_in_flight
694 l_wait_event_exclusive(cli->cl_destroy_waitq,
695 osc_can_send_destroy(cli), &lwi);
699 /* Do not wait for response */
700 ptlrpcd_add_req(req);
704 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
707 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
709 LASSERT(!(oa->o_valid & bits));
712 client_obd_list_lock(&cli->cl_loi_list_lock);
713 oa->o_dirty = cli->cl_dirty;
714 if (cli->cl_dirty > cli->cl_dirty_max) {
715 CERROR("dirty %lu > dirty_max %lu\n",
716 cli->cl_dirty, cli->cl_dirty_max);
718 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
719 CERROR("dirty %d > system dirty_max %d\n",
720 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
722 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
723 CERROR("dirty %lu - dirty_max %lu too big???\n",
724 cli->cl_dirty, cli->cl_dirty_max);
727 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
728 (cli->cl_max_rpcs_in_flight + 1);
729 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
731 oa->o_grant = cli->cl_avail_grant;
732 oa->o_dropped = cli->cl_lost_grant;
733 cli->cl_lost_grant = 0;
734 client_obd_list_unlock(&cli->cl_loi_list_lock);
735 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
736 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
740 static void osc_update_next_shrink(struct client_obd *cli)
742 cli->cl_next_shrink_grant =
743 cfs_time_shift(cli->cl_grant_shrink_interval);
744 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
745 cli->cl_next_shrink_grant);
748 /* caller must hold loi_list_lock */
749 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
751 atomic_inc(&obd_dirty_pages);
752 cli->cl_dirty += CFS_PAGE_SIZE;
753 cli->cl_avail_grant -= CFS_PAGE_SIZE;
754 pga->flag |= OBD_BRW_FROM_GRANT;
755 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
756 CFS_PAGE_SIZE, pga, pga->pg);
757 LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
758 cli->cl_avail_grant);
759 osc_update_next_shrink(cli);
762 /* the companion to osc_consume_write_grant, called when a brw has completed.
763 * must be called with the loi lock held. */
764 static void osc_release_write_grant(struct client_obd *cli,
765 struct brw_page *pga, int sent)
767 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
770 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
775 pga->flag &= ~OBD_BRW_FROM_GRANT;
776 atomic_dec(&obd_dirty_pages);
777 cli->cl_dirty -= CFS_PAGE_SIZE;
779 cli->cl_lost_grant += CFS_PAGE_SIZE;
780 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
781 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
782 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
783 /* For short writes we shouldn't count parts of pages that
784 * span a whole block on the OST side, or our accounting goes
785 * wrong. Should match the code in filter_grant_check. */
786 int offset = pga->off & ~CFS_PAGE_MASK;
787 int count = pga->count + (offset & (blocksize - 1));
788 int end = (offset + pga->count) & (blocksize - 1);
790 count += blocksize - end;
792 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
793 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
794 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
795 cli->cl_avail_grant, cli->cl_dirty);
801 static unsigned long rpcs_in_flight(struct client_obd *cli)
803 return cli->cl_r_in_flight + cli->cl_w_in_flight;
806 /* caller must hold loi_list_lock */
807 void osc_wake_cache_waiters(struct client_obd *cli)
809 struct list_head *l, *tmp;
810 struct osc_cache_waiter *ocw;
813 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
814 /* if we can't dirty more, we must wait until some is written */
815 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
816 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
817 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
818 "osc max %ld, sys max %d\n", cli->cl_dirty,
819 cli->cl_dirty_max, obd_max_dirty_pages);
823 /* if still dirty cache but no grant wait for pending RPCs that
824 * may yet return us some grant before doing sync writes */
825 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
826 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
827 cli->cl_w_in_flight);
831 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
832 list_del_init(&ocw->ocw_entry);
833 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
834 /* no more RPCs in flight to return grant, do sync IO */
835 ocw->ocw_rc = -EDQUOT;
836 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
838 osc_consume_write_grant(cli,
839 &ocw->ocw_oap->oap_brw_page);
842 cfs_waitq_signal(&ocw->ocw_waitq);
848 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
850 client_obd_list_lock(&cli->cl_loi_list_lock);
851 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
852 if (body->oa.o_valid & OBD_MD_FLGRANT)
853 cli->cl_avail_grant += body->oa.o_grant;
854 /* waiters are woken in brw_interpret */
855 client_obd_list_unlock(&cli->cl_loi_list_lock);
858 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
859 void *key, obd_count vallen, void *val,
860 struct ptlrpc_request_set *set);
862 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
865 struct osc_grant_args *aa = data;
866 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
867 struct obdo *oa = aa->aa_oa;
868 struct ost_body *body;
871 client_obd_list_lock(&cli->cl_loi_list_lock);
872 cli->cl_avail_grant += oa->o_grant;
873 client_obd_list_unlock(&cli->cl_loi_list_lock);
876 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
877 lustre_swab_ost_body);
878 osc_update_grant(cli, body);
884 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
886 client_obd_list_lock(&cli->cl_loi_list_lock);
887 oa->o_grant = cli->cl_avail_grant / 4;
888 cli->cl_avail_grant -= oa->o_grant;
889 client_obd_list_unlock(&cli->cl_loi_list_lock);
890 oa->o_flags |= OBD_FL_SHRINK_GRANT;
891 osc_update_next_shrink(cli);
894 /* Shrink the current grant, either from some large amount to enough for a
895 * full set of in-flight RPCs, or if we have already shrunk to that limit
896 * then to enough for a single RPC. This avoids keeping more grant than
897 * needed, and avoids shrinking the grant piecemeal. */
898 static int osc_shrink_grant(struct client_obd *cli)
900 long target = (cli->cl_max_rpcs_in_flight + 1) *
901 cli->cl_max_pages_per_rpc;
903 client_obd_list_lock(&cli->cl_loi_list_lock);
904 if (cli->cl_avail_grant <= target)
905 target = cli->cl_max_pages_per_rpc;
906 client_obd_list_unlock(&cli->cl_loi_list_lock);
908 return osc_shrink_grant_to_target(cli, target);
911 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
914 struct ost_body *body;
917 client_obd_list_lock(&cli->cl_loi_list_lock);
918 /* Don't shrink if we are already above or below the desired limit
919 * We don't want to shrink below a single RPC, as that will negatively
920 * impact block allocation and long-term performance. */
921 if (target < cli->cl_max_pages_per_rpc)
922 target = cli->cl_max_pages_per_rpc;
924 if (target >= cli->cl_avail_grant) {
925 client_obd_list_unlock(&cli->cl_loi_list_lock);
928 client_obd_list_unlock(&cli->cl_loi_list_lock);
934 osc_announce_cached(cli, &body->oa, 0);
936 client_obd_list_lock(&cli->cl_loi_list_lock);
937 body->oa.o_grant = cli->cl_avail_grant - target;
938 cli->cl_avail_grant = target;
939 client_obd_list_unlock(&cli->cl_loi_list_lock);
940 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
941 osc_update_next_shrink(cli);
943 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
944 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
945 sizeof(*body), body, NULL);
947 client_obd_list_lock(&cli->cl_loi_list_lock);
948 cli->cl_avail_grant += body->oa.o_grant;
949 client_obd_list_unlock(&cli->cl_loi_list_lock);
955 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
956 static int osc_should_shrink_grant(struct client_obd *client)
958 cfs_time_t time = cfs_time_current();
959 cfs_time_t next_shrink = client->cl_next_shrink_grant;
960 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
961 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
962 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
965 osc_update_next_shrink(client);
970 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
972 struct client_obd *client;
974 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
975 if (osc_should_shrink_grant(client))
976 osc_shrink_grant(client);
981 static int osc_add_shrink_grant(struct client_obd *client)
985 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
987 osc_grant_shrink_grant_cb, NULL,
988 &client->cl_grant_shrink_list);
990 CERROR("add grant client %s error %d\n",
991 client->cl_import->imp_obd->obd_name, rc);
994 CDEBUG(D_CACHE, "add grant client %s \n",
995 client->cl_import->imp_obd->obd_name);
996 osc_update_next_shrink(client);
1000 static int osc_del_shrink_grant(struct client_obd *client)
1002 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1006 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1008 client_obd_list_lock(&cli->cl_loi_list_lock);
1009 cli->cl_avail_grant = ocd->ocd_grant;
1010 client_obd_list_unlock(&cli->cl_loi_list_lock);
1012 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1013 list_empty(&cli->cl_grant_shrink_list))
1014 osc_add_shrink_grant(cli);
1016 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1017 cli->cl_avail_grant, cli->cl_lost_grant);
1018 LASSERT(cli->cl_avail_grant >= 0);
1021 /* We assume that the reason this OSC got a short read is because it read
1022 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1023 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1024 * this stripe never got written at or beyond this stripe offset yet. */
1025 static void handle_short_read(int nob_read, obd_count page_count,
1026 struct brw_page **pga)
1031 /* skip bytes read OK */
1032 while (nob_read > 0) {
1033 LASSERT (page_count > 0);
1035 if (pga[i]->count > nob_read) {
1036 /* EOF inside this page */
1037 ptr = cfs_kmap(pga[i]->pg) +
1038 (pga[i]->off & ~CFS_PAGE_MASK);
1039 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1040 cfs_kunmap(pga[i]->pg);
1046 nob_read -= pga[i]->count;
1051 /* zero remaining pages */
1052 while (page_count-- > 0) {
1053 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1054 memset(ptr, 0, pga[i]->count);
1055 cfs_kunmap(pga[i]->pg);
1060 static int check_write_rcs(struct ptlrpc_request *req,
1061 int requested_nob, int niocount,
1062 obd_count page_count, struct brw_page **pga)
1066 /* return error if any niobuf was in error */
1067 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1068 sizeof(*remote_rcs) * niocount, NULL);
1069 if (remote_rcs == NULL) {
1070 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1073 if (lustre_rep_need_swab(req))
1074 for (i = 0; i < niocount; i++)
1075 __swab32s(&remote_rcs[i]);
1077 for (i = 0; i < niocount; i++) {
1078 if (remote_rcs[i] < 0)
1079 return(remote_rcs[i]);
1081 if (remote_rcs[i] != 0) {
1082 CERROR("rc[%d] invalid (%d) req %p\n",
1083 i, remote_rcs[i], req);
1088 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1089 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1090 req->rq_bulk->bd_nob_transferred, requested_nob);
1097 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1099 if (p1->flag != p2->flag) {
1100 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1102 /* warn if we try to combine flags that we don't know to be
1103 * safe to combine */
1104 if ((p1->flag & mask) != (p2->flag & mask))
1105 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1106 "same brw?\n", p1->flag, p2->flag);
1110 return (p1->off + p1->count == p2->off);
1113 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1114 struct brw_page **pga, int opc,
1115 cksum_type_t cksum_type, int pshift)
1120 LASSERT (pg_count > 0);
1121 cksum = init_checksum(cksum_type);
1122 while (nob > 0 && pg_count > 0) {
1123 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1124 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1125 int count = pga[i]->count > nob ? nob : pga[i]->count;
1127 /* corrupt the data before we compute the checksum, to
1128 * simulate an OST->client data error */
1129 if (i == 0 && opc == OST_READ &&
1130 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1131 memcpy(ptr + off, "bad1", min(4, nob));
1132 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1133 cfs_kunmap(pga[i]->pg);
1134 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1137 nob -= pga[i]->count;
1141 /* For sending we only compute the wrong checksum instead
1142 * of corrupting the data so it is still correct on a redo */
1143 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1149 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1150 struct lov_stripe_md *lsm, obd_count page_count,
1151 struct brw_page **pga,
1152 struct ptlrpc_request **reqp, int pshift)
1154 struct ptlrpc_request *req;
1155 struct ptlrpc_bulk_desc *desc;
1156 struct ost_body *body;
1157 struct obd_ioobj *ioobj;
1158 struct niobuf_remote *niobuf;
1159 __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1160 int niocount, i, requested_nob, opc, rc;
1161 struct ptlrpc_request_pool *pool;
1162 struct osc_brw_async_args *aa;
1163 struct brw_page *pg_prev;
1166 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1167 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1169 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1170 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1172 for (niocount = i = 1; i < page_count; i++) {
1173 if (!can_merge_pages(pga[i - 1], pga[i]))
1177 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1178 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1180 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1185 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1186 ptlrpc_at_set_req_timeout(req);
1188 if (opc == OST_WRITE)
1189 desc = ptlrpc_prep_bulk_imp (req, page_count,
1190 BULK_GET_SOURCE, OST_BULK_PORTAL);
1192 desc = ptlrpc_prep_bulk_imp (req, page_count,
1193 BULK_PUT_SINK, OST_BULK_PORTAL);
1195 GOTO(out, rc = -ENOMEM);
1196 /* NB request now owns desc and will free it when it gets freed */
1198 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1199 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1200 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1201 niocount * sizeof(*niobuf));
1203 lustre_set_wire_obdo(&body->oa, oa);
1204 obdo_to_ioobj(oa, ioobj);
1205 ioobj->ioo_bufcnt = niocount;
1207 LASSERT (page_count > 0);
1209 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1210 struct brw_page *pg = pga[i];
1212 LASSERT(pg->count > 0);
1213 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1214 pg->count <= CFS_PAGE_SIZE,
1215 "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1216 i, pg, pg->off, pg->count, pshift);
1218 LASSERTF(i == 0 || pg->off > pg_prev->off,
1219 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1220 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1222 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1223 pg_prev->pg, page_private(pg_prev->pg),
1224 pg_prev->pg->index, pg_prev->off);
1226 LASSERTF(i == 0 || pg->off > pg_prev->off,
1227 "i %d p_c %u\n", i, page_count);
1229 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1230 (pg->flag & OBD_BRW_SRVLOCK));
1232 ptlrpc_prep_bulk_page(desc, pg->pg,
1233 OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1235 requested_nob += pg->count;
1237 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1239 niobuf->len += pg->count;
1241 niobuf->offset = pg->off;
1242 niobuf->len = pg->count;
1243 niobuf->flags = pg->flag;
1248 LASSERTF((void *)(niobuf - niocount) ==
1249 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1250 niocount * sizeof(*niobuf)),
1251 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1252 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1253 (void *)(niobuf - niocount));
1255 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1256 if (osc_should_shrink_grant(cli))
1257 osc_shrink_grant_local(cli, &body->oa);
1259 /* size[REQ_REC_OFF] still sizeof (*body) */
1260 if (opc == OST_WRITE) {
1261 if (cli->cl_checksum) {
1262 /* store cl_cksum_type in a local variable since
1263 * it can be changed via lprocfs */
1264 cksum_type_t cksum_type = cli->cl_cksum_type;
1266 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1267 oa->o_flags &= OBD_FL_LOCAL_MASK;
1268 body->oa.o_flags = 0;
1270 body->oa.o_flags |= cksum_type_pack(cksum_type);
1271 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1272 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1275 cksum_type, pshift);
1276 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1278 /* save this in 'oa', too, for later checking */
1279 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1280 oa->o_flags |= cksum_type_pack(cksum_type);
1282 /* clear out the checksum flag, in case this is a
1283 * resend but cl_checksum is no longer set. b=11238 */
1284 oa->o_valid &= ~OBD_MD_FLCKSUM;
1286 oa->o_cksum = body->oa.o_cksum;
1287 /* 1 RC per niobuf */
1288 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1289 ptlrpc_req_set_repsize(req, 3, size);
1291 if (cli->cl_checksum) {
1292 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293 body->oa.o_flags = 0;
1294 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1297 /* 1 RC for the whole I/O */
1298 ptlrpc_req_set_repsize(req, 2, size);
1301 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1302 aa = ptlrpc_req_async_args(req);
1304 aa->aa_requested_nob = requested_nob;
1305 aa->aa_nio_count = niocount;
1306 aa->aa_page_count = page_count;
1310 aa->aa_pshift = pshift;
1311 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1317 ptlrpc_req_finished (req);
1321 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1322 __u32 client_cksum, __u32 server_cksum, int nob,
1323 obd_count page_count, struct brw_page **pga,
1324 cksum_type_t client_cksum_type, int pshift)
1328 cksum_type_t cksum_type;
1330 if (server_cksum == client_cksum) {
1331 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1335 if (oa->o_valid & OBD_MD_FLFLAGS)
1336 cksum_type = cksum_type_unpack(oa->o_flags);
1338 cksum_type = OBD_CKSUM_CRC32;
1340 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1341 cksum_type, pshift);
1343 if (cksum_type != client_cksum_type)
1344 msg = "the server did not use the checksum type specified in "
1345 "the original request - likely a protocol problem";
1346 else if (new_cksum == server_cksum)
1347 msg = "changed on the client after we checksummed it - "
1348 "likely false positive due to mmap IO (bug 11742)";
1349 else if (new_cksum == client_cksum)
1350 msg = "changed in transit before arrival at OST";
1352 msg = "changed in transit AND doesn't match the original - "
1353 "likely false positive due to mmap IO (bug 11742)";
1355 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1356 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1357 "["LPU64"-"LPU64"]\n",
1358 msg, libcfs_nid2str(peer->nid),
1359 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1360 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1363 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1365 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1366 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1367 "client csum now %x\n", client_cksum, client_cksum_type,
1368 server_cksum, cksum_type, new_cksum);
1373 /* Note rc enters this function as number of bytes transferred */
1374 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1376 struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1377 const lnet_process_id_t *peer =
1378 &req->rq_import->imp_connection->c_peer;
1379 struct client_obd *cli = aa->aa_cli;
1380 struct ost_body *body;
1381 __u32 client_cksum = 0;
1384 if (rc < 0 && rc != -EDQUOT)
1387 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1388 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1389 lustre_swab_ost_body);
1391 CERROR ("Can't unpack body\n");
1395 /* set/clear over quota flag for a uid/gid */
1396 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1397 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1398 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1399 body->oa.o_gid, body->oa.o_valid,
1405 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1406 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1408 osc_update_grant(cli, body);
1410 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1412 CERROR ("Unexpected +ve rc %d\n", rc);
1415 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1417 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1418 check_write_checksum(&body->oa, peer, client_cksum,
1419 body->oa.o_cksum, aa->aa_requested_nob,
1420 aa->aa_page_count, aa->aa_ppga,
1421 cksum_type_unpack(aa->aa_oa->o_flags),
1425 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1426 aa->aa_page_count, aa->aa_ppga);
1430 /* The rest of this function executes only for OST_READs */
1431 if (rc > aa->aa_requested_nob) {
1432 CERROR("Unexpected rc %d (%d requested)\n", rc,
1433 aa->aa_requested_nob);
1437 if (rc != req->rq_bulk->bd_nob_transferred) {
1438 CERROR ("Unexpected rc %d (%d transferred)\n",
1439 rc, req->rq_bulk->bd_nob_transferred);
1443 if (rc < aa->aa_requested_nob)
1444 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1446 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1447 static int cksum_counter;
1448 __u32 server_cksum = body->oa.o_cksum;
1451 cksum_type_t cksum_type;
1453 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1454 cksum_type = cksum_type_unpack(body->oa.o_flags);
1456 cksum_type = OBD_CKSUM_CRC32;
1457 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1458 aa->aa_ppga, OST_READ,
1459 cksum_type, aa->aa_pshift);
1461 if (peer->nid == req->rq_bulk->bd_sender) {
1465 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1468 if (server_cksum == ~0 && rc > 0) {
1469 CERROR("Protocol error: server %s set the 'checksum' "
1470 "bit, but didn't send a checksum. Not fatal, "
1471 "but please notify on http://bugzilla.lustre.org/\n",
1472 libcfs_nid2str(peer->nid));
1473 } else if (server_cksum != client_cksum) {
1474 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1475 "%s%s%s inum "LPU64"/"LPU64" object "
1476 LPU64"/"LPU64" extent "
1477 "["LPU64"-"LPU64"]\n",
1478 req->rq_import->imp_obd->obd_name,
1479 libcfs_nid2str(peer->nid),
1481 body->oa.o_valid & OBD_MD_FLFID ?
1482 body->oa.o_fid : (__u64)0,
1483 body->oa.o_valid & OBD_MD_FLFID ?
1484 body->oa.o_generation :(__u64)0,
1486 body->oa.o_valid & OBD_MD_FLGROUP ?
1487 body->oa.o_gr : (__u64)0,
1488 aa->aa_ppga[0]->off,
1489 aa->aa_ppga[aa->aa_page_count-1]->off +
1490 aa->aa_ppga[aa->aa_page_count-1]->count -
1492 CERROR("client %x, server %x, cksum_type %x\n",
1493 client_cksum, server_cksum, cksum_type);
1495 aa->aa_oa->o_cksum = client_cksum;
1499 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1502 } else if (unlikely(client_cksum)) {
1503 static int cksum_missed;
1506 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1507 CERROR("Checksum %u requested from %s but not sent\n",
1508 cksum_missed, libcfs_nid2str(peer->nid));
1514 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1519 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1520 struct lov_stripe_md *lsm,
1521 obd_count page_count, struct brw_page **pga)
1523 struct ptlrpc_request *request;
1527 struct l_wait_info lwi;
1530 init_waitqueue_head(&waitq);
1533 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1534 page_count, pga, &request, 0);
1538 rc = ptlrpc_queue_wait(request);
1540 if (rc == -ETIMEDOUT && request->rq_resend) {
1541 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1542 ptlrpc_req_finished(request);
1546 rc = osc_brw_fini_request(request, rc);
1548 ptlrpc_req_finished(request);
1549 if (osc_recoverable_error(rc)) {
1551 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1552 CERROR("too many resend retries, returning error\n");
1556 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1557 l_wait_event(waitq, 0, &lwi);
1564 int osc_brw_redo_request(struct ptlrpc_request *request,
1565 struct osc_brw_async_args *aa)
1567 struct ptlrpc_request *new_req;
1568 struct ptlrpc_request_set *set = request->rq_set;
1569 struct osc_brw_async_args *new_aa;
1570 struct osc_async_page *oap;
1574 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1575 CERROR("too many resend retries, returning error\n");
1579 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1581 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1582 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1583 aa->aa_cli, aa->aa_oa,
1584 NULL /* lsm unused by osc currently */,
1585 aa->aa_page_count, aa->aa_ppga, &new_req,
1590 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1592 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1593 if (oap->oap_request != NULL) {
1594 LASSERTF(request == oap->oap_request,
1595 "request %p != oap_request %p\n",
1596 request, oap->oap_request);
1597 if (oap->oap_interrupted) {
1598 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1599 ptlrpc_req_finished(new_req);
1604 /* New request takes over pga and oaps from old request.
1605 * Note that copying a list_head doesn't work, need to move it... */
1607 new_req->rq_interpret_reply = request->rq_interpret_reply;
1608 new_req->rq_async_args = request->rq_async_args;
1609 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1611 new_aa = ptlrpc_req_async_args(new_req);
1613 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1614 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1615 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1617 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1618 if (oap->oap_request) {
1619 ptlrpc_req_finished(oap->oap_request);
1620 oap->oap_request = ptlrpc_request_addref(new_req);
1624 /* use ptlrpc_set_add_req is safe because interpret functions work
1625 * in check_set context. only one way exist with access to request
1626 * from different thread got -EINTR - this way protected with
1627 * cl_loi_list_lock */
1628 ptlrpc_set_add_req(set, new_req);
1630 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1632 DEBUG_REQ(D_INFO, new_req, "new request");
1636 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1637 struct lov_stripe_md *lsm, obd_count page_count,
1638 struct brw_page **pga, struct ptlrpc_request_set *set,
1641 struct ptlrpc_request *request;
1642 struct client_obd *cli = &exp->exp_obd->u.cli;
1644 struct osc_brw_async_args *aa;
1647 /* Consume write credits even if doing a sync write -
1648 * otherwise we may run out of space on OST due to grant. */
1649 /* FIXME: unaligned writes must use write grants too */
1650 if (cmd == OBD_BRW_WRITE && pshift == 0) {
1651 client_obd_list_lock(&cli->cl_loi_list_lock);
1652 for (i = 0; i < page_count; i++) {
1653 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1654 osc_consume_write_grant(cli, pga[i]);
1656 client_obd_list_unlock(&cli->cl_loi_list_lock);
1659 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1660 page_count, pga, &request, pshift);
1662 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1665 aa = ptlrpc_req_async_args(request);
1666 if (cmd == OBD_BRW_READ) {
1667 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1668 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1670 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1671 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1672 cli->cl_w_in_flight);
1674 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1676 LASSERT(list_empty(&aa->aa_oaps));
1678 request->rq_interpret_reply = brw_interpret;
1679 ptlrpc_set_add_req(set, request);
1680 client_obd_list_lock(&cli->cl_loi_list_lock);
1681 if (cmd == OBD_BRW_READ)
1682 cli->cl_r_in_flight++;
1684 cli->cl_w_in_flight++;
1685 client_obd_list_unlock(&cli->cl_loi_list_lock);
1686 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1687 } else if (cmd == OBD_BRW_WRITE) {
1688 client_obd_list_lock(&cli->cl_loi_list_lock);
1689 for (i = 0; i < page_count; i++)
1690 osc_release_write_grant(cli, pga[i], 0);
1691 osc_wake_cache_waiters(cli);
1692 client_obd_list_unlock(&cli->cl_loi_list_lock);
1699 * ugh, we want disk allocation on the target to happen in offset order. we'll
1700 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1701 * fine for our small page arrays and doesn't require allocation. its an
1702 * insertion sort that swaps elements that are strides apart, shrinking the
1703 * stride down until its '1' and the array is sorted.
1705 static void sort_brw_pages(struct brw_page **array, int num)
1708 struct brw_page *tmp;
1712 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1717 for (i = stride ; i < num ; i++) {
1720 while (j >= stride && array[j-stride]->off > tmp->off) {
1721 array[j] = array[j - stride];
1726 } while (stride > 1);
1729 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1736 LASSERT (pages > 0);
1737 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1741 if (pages == 0) /* that's all */
1744 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1745 return count; /* doesn't end on page boundary */
1748 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1749 if (offset != 0) /* doesn't start on page boundary */
1756 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1758 struct brw_page **ppga;
1761 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1765 for (i = 0; i < count; i++)
1770 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1772 LASSERT(ppga != NULL);
1773 OBD_FREE(ppga, sizeof(*ppga) * count);
1776 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1777 obd_count page_count, struct brw_page *pga,
1778 struct obd_trans_info *oti)
1780 struct obdo *saved_oa = NULL;
1781 struct brw_page **ppga, **orig;
1782 struct obd_import *imp = class_exp2cliimp(exp);
1783 struct client_obd *cli;
1784 int rc, page_count_orig;
1787 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1788 cli = &imp->imp_obd->u.cli;
1790 if (cmd & OBD_BRW_CHECK) {
1791 /* The caller just wants to know if there's a chance that this
1792 * I/O can succeed */
1794 if (imp->imp_invalid)
1799 /* test_brw with a failed create can trip this, maybe others. */
1800 LASSERT(cli->cl_max_pages_per_rpc);
1804 orig = ppga = osc_build_ppga(pga, page_count);
1807 page_count_orig = page_count;
1809 sort_brw_pages(ppga, page_count);
1810 while (page_count) {
1811 obd_count pages_per_brw;
1813 if (page_count > cli->cl_max_pages_per_rpc)
1814 pages_per_brw = cli->cl_max_pages_per_rpc;
1816 pages_per_brw = page_count;
1818 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1820 if (saved_oa != NULL) {
1821 /* restore previously saved oa */
1822 *oinfo->oi_oa = *saved_oa;
1823 } else if (page_count > pages_per_brw) {
1824 /* save a copy of oa (brw will clobber it) */
1825 OBDO_ALLOC(saved_oa);
1826 if (saved_oa == NULL)
1827 GOTO(out, rc = -ENOMEM);
1828 *saved_oa = *oinfo->oi_oa;
1831 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1832 pages_per_brw, ppga);
1837 page_count -= pages_per_brw;
1838 ppga += pages_per_brw;
1842 osc_release_ppga(orig, page_count_orig);
1844 if (saved_oa != NULL)
1845 OBDO_FREE(saved_oa);
1850 static int osc_brw_async(int cmd, struct obd_export *exp,
1851 struct obd_info *oinfo, obd_count page_count,
1852 struct brw_page *pga, struct obd_trans_info *oti,
1853 struct ptlrpc_request_set *set, int pshift)
1855 struct brw_page **ppga, **orig;
1856 int page_count_orig;
1860 if (cmd & OBD_BRW_CHECK) {
1861 /* The caller just wants to know if there's a chance that this
1862 * I/O can succeed */
1863 struct obd_import *imp = class_exp2cliimp(exp);
1865 if (imp == NULL || imp->imp_invalid)
1870 orig = ppga = osc_build_ppga(pga, page_count);
1873 page_count_orig = page_count;
1875 sort_brw_pages(ppga, page_count);
1876 while (page_count) {
1877 struct brw_page **copy;
1879 obd_count pages_per_brw;
1881 /* one page less under unaligned direct i/o */
1882 pages_per_brw = min_t(obd_count, page_count,
1883 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1886 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1889 /* use ppga only if single RPC is going to fly */
1890 if (pages_per_brw != page_count_orig || ppga != orig) {
1891 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1893 GOTO(out, rc = -ENOMEM);
1894 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1898 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1899 GOTO(out, rc = -ENOMEM);
1901 memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1902 oa->o_flags |= OBD_FL_TEMPORARY;
1906 LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1909 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1914 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1916 if (oa->o_flags & OBD_FL_TEMPORARY)
1922 /* we passed it to async_internal() which is
1923 * now responsible for releasing memory */
1927 page_count -= pages_per_brw;
1928 ppga += pages_per_brw;
1932 osc_release_ppga(orig, page_count_orig);
1936 static void osc_check_rpcs(struct client_obd *cli);
1938 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1939 * the dirty accounting. Writeback completes or truncate happens before
1940 * writing starts. Must be called with the loi lock held. */
1941 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1944 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1947 /* This maintains the lists of pending pages to read/write for a given object
1948 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1949 * to quickly find objects that are ready to send an RPC. */
1950 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1956 if (lop->lop_num_pending == 0)
1959 /* if we have an invalid import we want to drain the queued pages
1960 * by forcing them through rpcs that immediately fail and complete
1961 * the pages. recovery relies on this to empty the queued pages
1962 * before canceling the locks and evicting down the llite pages */
1963 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1966 /* stream rpcs in queue order as long as as there is an urgent page
1967 * queued. this is our cheap solution for good batching in the case
1968 * where writepage marks some random page in the middle of the file
1969 * as urgent because of, say, memory pressure */
1970 if (!list_empty(&lop->lop_urgent)) {
1971 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1975 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1976 optimal = cli->cl_max_pages_per_rpc;
1977 if (cmd & OBD_BRW_WRITE) {
1978 /* trigger a write rpc stream as long as there are dirtiers
1979 * waiting for space. as they're waiting, they're not going to
1980 * create more pages to coallesce with what's waiting.. */
1981 if (!list_empty(&cli->cl_cache_waiters)) {
1982 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1986 /* +16 to avoid triggering rpcs that would want to include pages
1987 * that are being queued but which can't be made ready until
1988 * the queuer finishes with the page. this is a wart for
1989 * llite::commit_write() */
1992 if (lop->lop_num_pending >= optimal)
1998 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2000 struct osc_async_page *oap;
2003 if (list_empty(&lop->lop_urgent))
2006 oap = list_entry(lop->lop_urgent.next,
2007 struct osc_async_page, oap_urgent_item);
2009 if (oap->oap_async_flags & ASYNC_HP) {
2010 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2017 static void on_list(struct list_head *item, struct list_head *list,
2020 if (list_empty(item) && should_be_on)
2021 list_add_tail(item, list);
2022 else if (!list_empty(item) && !should_be_on)
2023 list_del_init(item);
2026 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2027 * can find pages to build into rpcs quickly */
2028 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2030 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2031 lop_makes_hprpc(&loi->loi_read_lop)) {
2033 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2034 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2036 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2037 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2038 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2039 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2042 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2043 loi->loi_write_lop.lop_num_pending);
2045 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2046 loi->loi_read_lop.lop_num_pending);
2049 static void lop_update_pending(struct client_obd *cli,
2050 struct loi_oap_pages *lop, int cmd, int delta)
2052 lop->lop_num_pending += delta;
2053 if (cmd & OBD_BRW_WRITE)
2054 cli->cl_pending_w_pages += delta;
2056 cli->cl_pending_r_pages += delta;
2059 /* this is called when a sync waiter receives an interruption. Its job is to
2060 * get the caller woken as soon as possible. If its page hasn't been put in an
2061 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2062 * desiring interruption which will forcefully complete the rpc once the rpc
2064 static void osc_occ_interrupted(struct oig_callback_context *occ)
2066 struct osc_async_page *oap;
2067 struct loi_oap_pages *lop;
2068 struct lov_oinfo *loi;
2071 /* XXX member_of() */
2072 oap = list_entry(occ, struct osc_async_page, oap_occ);
2074 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2076 oap->oap_interrupted = 1;
2078 /* ok, it's been put in an rpc. only one oap gets a request reference */
2079 if (oap->oap_request != NULL) {
2080 ptlrpc_mark_interrupted(oap->oap_request);
2081 ptlrpcd_wake(oap->oap_request);
2085 /* we don't get interruption callbacks until osc_trigger_group_io()
2086 * has been called and put the sync oaps in the pending/urgent lists.*/
2087 if (!list_empty(&oap->oap_pending_item)) {
2088 list_del_init(&oap->oap_pending_item);
2089 list_del_init(&oap->oap_urgent_item);
2092 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2093 &loi->loi_write_lop : &loi->loi_read_lop;
2094 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2095 loi_list_maint(oap->oap_cli, oap->oap_loi);
2097 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2098 oap->oap_oig = NULL;
2102 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2105 /* this is trying to propogate async writeback errors back up to the
2106 * application. As an async write fails we record the error code for later if
2107 * the app does an fsync. As long as errors persist we force future rpcs to be
2108 * sync so that the app can get a sync error and break the cycle of queueing
2109 * pages for which writeback will fail. */
2110 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2117 ar->ar_force_sync = 1;
2118 ar->ar_min_xid = ptlrpc_sample_next_xid();
2123 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2124 ar->ar_force_sync = 0;
2127 static void osc_oap_to_pending(struct osc_async_page *oap)
2129 struct loi_oap_pages *lop;
2131 if (oap->oap_cmd & OBD_BRW_WRITE)
2132 lop = &oap->oap_loi->loi_write_lop;
2134 lop = &oap->oap_loi->loi_read_lop;
2136 if (oap->oap_async_flags & ASYNC_HP)
2137 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2138 else if (oap->oap_async_flags & ASYNC_URGENT)
2139 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2140 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2141 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2144 /* this must be called holding the loi list lock to give coverage to exit_cache,
2145 * async_flag maintenance, and oap_request */
2146 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2147 struct osc_async_page *oap, int sent, int rc)
2152 if (oap->oap_request != NULL) {
2153 xid = ptlrpc_req_xid(oap->oap_request);
2154 ptlrpc_req_finished(oap->oap_request);
2155 oap->oap_request = NULL;
2158 spin_lock(&oap->oap_lock);
2159 oap->oap_async_flags = 0;
2160 spin_unlock(&oap->oap_lock);
2161 oap->oap_interrupted = 0;
2163 if (oap->oap_cmd & OBD_BRW_WRITE) {
2164 osc_process_ar(&cli->cl_ar, xid, rc);
2165 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2168 if (rc == 0 && oa != NULL) {
2169 if (oa->o_valid & OBD_MD_FLBLOCKS)
2170 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2171 if (oa->o_valid & OBD_MD_FLMTIME)
2172 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2173 if (oa->o_valid & OBD_MD_FLATIME)
2174 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2175 if (oa->o_valid & OBD_MD_FLCTIME)
2176 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2180 osc_exit_cache(cli, oap, sent);
2181 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2182 oap->oap_oig = NULL;
2187 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2188 oap->oap_cmd, oa, rc);
2190 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2191 * I/O on the page could start, but OSC calls it under lock
2192 * and thus we can add oap back to pending safely */
2194 /* upper layer wants to leave the page on pending queue */
2195 osc_oap_to_pending(oap);
2197 osc_exit_cache(cli, oap, sent);
2201 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2203 struct osc_brw_async_args *aa = data;
2204 struct client_obd *cli;
2207 rc = osc_brw_fini_request(request, rc);
2208 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2210 if (osc_recoverable_error(rc)) {
2211 rc = osc_brw_redo_request(request, aa);
2217 client_obd_list_lock(&cli->cl_loi_list_lock);
2218 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2219 * is called so we know whether to go to sync BRWs or wait for more
2220 * RPCs to complete */
2221 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2222 cli->cl_w_in_flight--;
2224 cli->cl_r_in_flight--;
2226 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2227 struct osc_async_page *oap, *tmp;
2228 /* the caller may re-use the oap after the completion call so
2229 * we need to clean it up a little */
2230 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2231 list_del_init(&oap->oap_rpc_item);
2232 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2234 OBDO_FREE(aa->aa_oa);
2235 } else { /* from async_internal() */
2237 for (i = 0; i < aa->aa_page_count; i++)
2238 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2240 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2241 OBDO_FREE(aa->aa_oa);
2243 osc_wake_cache_waiters(cli);
2244 osc_check_rpcs(cli);
2245 client_obd_list_unlock(&cli->cl_loi_list_lock);
2247 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2252 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2253 struct list_head *rpc_list,
2254 int page_count, int cmd)
2256 struct ptlrpc_request *req;
2257 struct brw_page **pga = NULL;
2258 struct osc_brw_async_args *aa;
2259 struct obdo *oa = NULL;
2260 struct obd_async_page_ops *ops = NULL;
2261 void *caller_data = NULL;
2262 struct osc_async_page *oap;
2263 struct ldlm_lock *lock = NULL;
2268 LASSERT(!list_empty(rpc_list));
2270 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2272 RETURN(ERR_PTR(-ENOMEM));
2276 GOTO(out, req = ERR_PTR(-ENOMEM));
2279 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2281 ops = oap->oap_caller_ops;
2282 caller_data = oap->oap_caller_data;
2283 lock = oap->oap_ldlm_lock;
2285 pga[i] = &oap->oap_brw_page;
2286 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2287 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2288 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2292 /* always get the data for the obdo for the rpc */
2293 LASSERT(ops != NULL);
2294 ops->ap_fill_obdo(caller_data, cmd, oa);
2296 oa->o_handle = lock->l_remote_handle;
2297 oa->o_valid |= OBD_MD_FLHANDLE;
2300 sort_brw_pages(pga, page_count);
2301 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0);
2303 CERROR("prep_req failed: %d\n", rc);
2304 GOTO(out, req = ERR_PTR(rc));
2306 oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2307 sizeof(struct ost_body)))->oa;
2309 /* Need to update the timestamps after the request is built in case
2310 * we race with setattr (locally or in queue at OST). If OST gets
2311 * later setattr before earlier BRW (as determined by the request xid),
2312 * the OST will not use BRW timestamps. Sadly, there is no obvious
2313 * way to do this in a single call. bug 10150 */
2314 if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2315 /* in case of lockless read/write do not use inode's
2316 * timestamps because concurrent stat might fill the
2317 * inode with out-of-date times, send current
2319 if (cmd & OBD_BRW_WRITE) {
2320 oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2321 oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2322 valid = OBD_MD_FLATIME;
2324 oa->o_atime = LTIME_S(CURRENT_TIME);
2325 oa->o_valid |= OBD_MD_FLATIME;
2326 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2329 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2331 ops->ap_update_obdo(caller_data, cmd, oa, valid);
2333 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2334 aa = ptlrpc_req_async_args(req);
2335 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2336 list_splice(rpc_list, &aa->aa_oaps);
2337 CFS_INIT_LIST_HEAD(rpc_list);
2344 OBD_FREE(pga, sizeof(*pga) * page_count);
2349 /* the loi lock is held across this function but it's allowed to release
2350 * and reacquire it during its work */
2352 * prepare pages for ASYNC io and put pages in send queue.
2356 * \param cmd - OBD_BRW_* macroses
2357 * \param lop - pending pages
2359 * \return zero if pages successfully add to send queue.
2360 * \return not zere if error occurring.
2362 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2363 int cmd, struct loi_oap_pages *lop)
2365 struct ptlrpc_request *req;
2366 obd_count page_count = 0;
2367 struct osc_async_page *oap = NULL, *tmp;
2368 struct osc_brw_async_args *aa;
2369 struct obd_async_page_ops *ops;
2370 CFS_LIST_HEAD(rpc_list);
2371 unsigned int ending_offset;
2372 unsigned starting_offset = 0;
2376 /* If there are HP OAPs we need to handle at least 1 of them,
2377 * move it the beginning of the pending list for that. */
2378 if (!list_empty(&lop->lop_urgent)) {
2379 oap = list_entry(lop->lop_urgent.next,
2380 struct osc_async_page, oap_urgent_item);
2381 if (oap->oap_async_flags & ASYNC_HP)
2382 list_move(&oap->oap_pending_item, &lop->lop_pending);
2385 /* first we find the pages we're allowed to work with */
2386 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2387 ops = oap->oap_caller_ops;
2389 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2390 "magic 0x%x\n", oap, oap->oap_magic);
2392 if (page_count != 0 &&
2393 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2394 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2395 " oap %p, page %p, srvlock %u\n",
2396 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2399 /* in llite being 'ready' equates to the page being locked
2400 * until completion unlocks it. commit_write submits a page
2401 * as not ready because its unlock will happen unconditionally
2402 * as the call returns. if we race with commit_write giving
2403 * us that page we dont' want to create a hole in the page
2404 * stream, so we stop and leave the rpc to be fired by
2405 * another dirtier or kupdated interval (the not ready page
2406 * will still be on the dirty list). we could call in
2407 * at the end of ll_file_write to process the queue again. */
2408 if (!(oap->oap_async_flags & ASYNC_READY)) {
2409 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2411 CDEBUG(D_INODE, "oap %p page %p returned %d "
2412 "instead of ready\n", oap,
2416 /* llite is telling us that the page is still
2417 * in commit_write and that we should try
2418 * and put it in an rpc again later. we
2419 * break out of the loop so we don't create
2420 * a hole in the sequence of pages in the rpc
2425 /* the io isn't needed.. tell the checks
2426 * below to complete the rpc with EINTR */
2427 spin_lock(&oap->oap_lock);
2428 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2429 spin_unlock(&oap->oap_lock);
2430 oap->oap_count = -EINTR;
2433 spin_lock(&oap->oap_lock);
2434 oap->oap_async_flags |= ASYNC_READY;
2435 spin_unlock(&oap->oap_lock);
2438 LASSERTF(0, "oap %p page %p returned %d "
2439 "from make_ready\n", oap,
2447 * Page submitted for IO has to be locked. Either by
2448 * ->ap_make_ready() or by higher layers.
2450 #if defined(__KERNEL__) && defined(__linux__)
2451 if(!(PageLocked(oap->oap_page) &&
2452 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2453 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2454 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2458 /* If there is a gap at the start of this page, it can't merge
2459 * with any previous page, so we'll hand the network a
2460 * "fragmented" page array that it can't transfer in 1 RDMA */
2461 if (page_count != 0 && oap->oap_page_off != 0)
2464 /* take the page out of our book-keeping */
2465 list_del_init(&oap->oap_pending_item);
2466 lop_update_pending(cli, lop, cmd, -1);
2467 list_del_init(&oap->oap_urgent_item);
2469 if (page_count == 0)
2470 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2471 (PTLRPC_MAX_BRW_SIZE - 1);
2473 /* ask the caller for the size of the io as the rpc leaves. */
2474 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2476 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2477 if (oap->oap_count <= 0) {
2478 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2480 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2484 /* now put the page back in our accounting */
2485 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2486 if (page_count == 0)
2487 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2488 if (++page_count >= cli->cl_max_pages_per_rpc)
2491 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2492 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2493 * have the same alignment as the initial writes that allocated
2494 * extents on the server. */
2495 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2496 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2497 if (ending_offset == 0)
2500 /* If there is a gap at the end of this page, it can't merge
2501 * with any subsequent pages, so we'll hand the network a
2502 * "fragmented" page array that it can't transfer in 1 RDMA */
2503 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2507 osc_wake_cache_waiters(cli);
2509 if (page_count == 0)
2512 loi_list_maint(cli, loi);
2514 client_obd_list_unlock(&cli->cl_loi_list_lock);
2516 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2518 /* this should happen rarely and is pretty bad, it makes the
2519 * pending list not follow the dirty order */
2520 client_obd_list_lock(&cli->cl_loi_list_lock);
2521 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2522 list_del_init(&oap->oap_rpc_item);
2524 /* queued sync pages can be torn down while the pages
2525 * were between the pending list and the rpc */
2526 if (oap->oap_interrupted) {
2527 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2528 osc_ap_completion(cli, NULL, oap, 0,
2532 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2534 loi_list_maint(cli, loi);
2535 RETURN(PTR_ERR(req));
2538 aa = ptlrpc_req_async_args(req);
2539 if (cmd == OBD_BRW_READ) {
2540 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2541 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2542 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2543 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2545 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2546 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2547 cli->cl_w_in_flight);
2548 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2549 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2551 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2553 client_obd_list_lock(&cli->cl_loi_list_lock);
2555 if (cmd == OBD_BRW_READ)
2556 cli->cl_r_in_flight++;
2558 cli->cl_w_in_flight++;
2560 /* queued sync pages can be torn down while the pages
2561 * were between the pending list and the rpc */
2563 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2564 /* only one oap gets a request reference */
2567 if (oap->oap_interrupted && !req->rq_intr) {
2568 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2570 ptlrpc_mark_interrupted(req);
2574 tmp->oap_request = ptlrpc_request_addref(req);
2576 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2577 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2579 req->rq_interpret_reply = brw_interpret;
2580 ptlrpcd_add_req(req);
2584 #define LOI_DEBUG(LOI, STR, args...) \
2585 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2586 !list_empty(&(LOI)->loi_ready_item) || \
2587 !list_empty(&(LOI)->loi_hp_ready_item), \
2588 (LOI)->loi_write_lop.lop_num_pending, \
2589 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2590 (LOI)->loi_read_lop.lop_num_pending, \
2591 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2594 /* This is called by osc_check_rpcs() to find which objects have pages that
2595 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2596 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2599 /* First return objects that have blocked locks so that they
2600 * will be flushed quickly and other clients can get the lock,
2601 * then objects which have pages ready to be stuffed into RPCs */
2602 if (!list_empty(&cli->cl_loi_hp_ready_list))
2603 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2604 struct lov_oinfo, loi_hp_ready_item));
2605 if (!list_empty(&cli->cl_loi_ready_list))
2606 RETURN(list_entry(cli->cl_loi_ready_list.next,
2607 struct lov_oinfo, loi_ready_item));
2609 /* then if we have cache waiters, return all objects with queued
2610 * writes. This is especially important when many small files
2611 * have filled up the cache and not been fired into rpcs because
2612 * they don't pass the nr_pending/object threshhold */
2613 if (!list_empty(&cli->cl_cache_waiters) &&
2614 !list_empty(&cli->cl_loi_write_list))
2615 RETURN(list_entry(cli->cl_loi_write_list.next,
2616 struct lov_oinfo, loi_write_item));
2618 /* then return all queued objects when we have an invalid import
2619 * so that they get flushed */
2620 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2621 if (!list_empty(&cli->cl_loi_write_list))
2622 RETURN(list_entry(cli->cl_loi_write_list.next,
2623 struct lov_oinfo, loi_write_item));
2624 if (!list_empty(&cli->cl_loi_read_list))
2625 RETURN(list_entry(cli->cl_loi_read_list.next,
2626 struct lov_oinfo, loi_read_item));
2631 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2633 struct osc_async_page *oap;
2636 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2637 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2638 struct osc_async_page, oap_urgent_item);
2639 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2642 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2643 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2644 struct osc_async_page, oap_urgent_item);
2645 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2648 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2651 /* called with the loi list lock held */
2652 static void osc_check_rpcs(struct client_obd *cli)
2654 struct lov_oinfo *loi;
2655 int rc = 0, race_counter = 0;
2658 while ((loi = osc_next_loi(cli)) != NULL) {
2659 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2661 if (osc_max_rpc_in_flight(cli, loi))
2664 /* attempt some read/write balancing by alternating between
2665 * reads and writes in an object. The makes_rpc checks here
2666 * would be redundant if we were getting read/write work items
2667 * instead of objects. we don't want send_oap_rpc to drain a
2668 * partial read pending queue when we're given this object to
2669 * do io on writes while there are cache waiters */
2670 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2671 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2672 &loi->loi_write_lop);
2680 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2681 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2682 &loi->loi_read_lop);
2691 /* attempt some inter-object balancing by issueing rpcs
2692 * for each object in turn */
2693 if (!list_empty(&loi->loi_hp_ready_item))
2694 list_del_init(&loi->loi_hp_ready_item);
2695 if (!list_empty(&loi->loi_ready_item))
2696 list_del_init(&loi->loi_ready_item);
2697 if (!list_empty(&loi->loi_write_item))
2698 list_del_init(&loi->loi_write_item);
2699 if (!list_empty(&loi->loi_read_item))
2700 list_del_init(&loi->loi_read_item);
2702 loi_list_maint(cli, loi);
2704 /* send_oap_rpc fails with 0 when make_ready tells it to
2705 * back off. llite's make_ready does this when it tries
2706 * to lock a page queued for write that is already locked.
2707 * we want to try sending rpcs from many objects, but we
2708 * don't want to spin failing with 0. */
2709 if (race_counter == 10)
2715 /* we're trying to queue a page in the osc so we're subject to the
2716 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2717 * If the osc's queued pages are already at that limit, then we want to sleep
2718 * until there is space in the osc's queue for us. We also may be waiting for
2719 * write credits from the OST if there are RPCs in flight that may return some
2720 * before we fall back to sync writes.
2722 * We need this know our allocation was granted in the presence of signals */
2723 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2727 client_obd_list_lock(&cli->cl_loi_list_lock);
2728 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2729 client_obd_list_unlock(&cli->cl_loi_list_lock);
2733 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2734 * grant or cache space. */
2735 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2736 struct osc_async_page *oap)
2738 struct osc_cache_waiter ocw;
2739 struct l_wait_info lwi = { 0 };
2742 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2743 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2744 cli->cl_dirty_max, obd_max_dirty_pages,
2745 cli->cl_lost_grant, cli->cl_avail_grant);
2747 /* force the caller to try sync io. this can jump the list
2748 * of queued writes and create a discontiguous rpc stream */
2749 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2750 loi->loi_ar.ar_force_sync)
2753 /* Hopefully normal case - cache space and write credits available */
2754 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2755 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2756 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2757 /* account for ourselves */
2758 osc_consume_write_grant(cli, &oap->oap_brw_page);
2762 /* Make sure that there are write rpcs in flight to wait for. This
2763 * is a little silly as this object may not have any pending but
2764 * other objects sure might. */
2765 if (cli->cl_w_in_flight) {
2766 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2767 cfs_waitq_init(&ocw.ocw_waitq);
2771 loi_list_maint(cli, loi);
2772 osc_check_rpcs(cli);
2773 client_obd_list_unlock(&cli->cl_loi_list_lock);
2775 CDEBUG(D_CACHE, "sleeping for cache space\n");
2776 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2778 client_obd_list_lock(&cli->cl_loi_list_lock);
2779 if (!list_empty(&ocw.ocw_entry)) {
2780 list_del(&ocw.ocw_entry);
2789 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2790 void **res, int rw, obd_off start, obd_off end,
2791 struct lustre_handle *lockh, int flags)
2793 struct ldlm_lock *lock = NULL;
2794 int rc, release = 0;
2798 if (lockh && lustre_handle_is_used(lockh)) {
2799 /* if a valid lockh is passed, just check that the corresponding
2800 * lock covers the extent */
2801 lock = ldlm_handle2lock(lockh);
2804 struct osc_async_page *oap = *res;
2805 spin_lock(&oap->oap_lock);
2806 lock = oap->oap_ldlm_lock;
2808 LDLM_LOCK_GET(lock);
2809 spin_unlock(&oap->oap_lock);
2811 /* lock can be NULL in case race obd_get_lock vs lock cancel
2812 * so we should be don't try match this */
2813 if (unlikely(!lock))
2816 rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2817 if (release == 1 && rc == 1)
2818 /* if a valid lockh was passed, we just need to check
2819 * that the lock covers the page, no reference should be
2821 ldlm_lock_decref(lockh,
2822 rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2823 LDLM_LOCK_PUT(lock);
2827 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2828 struct lov_oinfo *loi, cfs_page_t *page,
2829 obd_off offset, struct obd_async_page_ops *ops,
2830 void *data, void **res, int flags,
2831 struct lustre_handle *lockh)
2833 struct osc_async_page *oap;
2834 struct ldlm_res_id oid = {{0}};
2840 return size_round(sizeof(*oap));
2843 oap->oap_magic = OAP_MAGIC;
2844 oap->oap_cli = &exp->exp_obd->u.cli;
2847 oap->oap_caller_ops = ops;
2848 oap->oap_caller_data = data;
2850 oap->oap_page = page;
2851 oap->oap_obj_off = offset;
2853 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2854 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2855 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2856 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2858 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2860 spin_lock_init(&oap->oap_lock);
2862 /* If the page was marked as notcacheable - don't add to any locks */
2863 if (!(flags & OBD_PAGE_NO_CACHE)) {
2864 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2865 /* This is the only place where we can call cache_add_extent
2866 without oap_lock, because this page is locked now, and
2867 the lock we are adding it to is referenced, so cannot lose
2868 any pages either. */
2869 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2874 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2878 struct osc_async_page *oap_from_cookie(void *cookie)
2880 struct osc_async_page *oap = cookie;
2881 if (oap->oap_magic != OAP_MAGIC)
2882 return ERR_PTR(-EINVAL);
2886 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2887 struct lov_oinfo *loi, void *cookie,
2888 int cmd, obd_off off, int count,
2889 obd_flag brw_flags, enum async_flags async_flags)
2891 struct client_obd *cli = &exp->exp_obd->u.cli;
2892 struct osc_async_page *oap;
2896 oap = oap_from_cookie(cookie);
2898 RETURN(PTR_ERR(oap));
2900 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2903 if (!list_empty(&oap->oap_pending_item) ||
2904 !list_empty(&oap->oap_urgent_item) ||
2905 !list_empty(&oap->oap_rpc_item))
2908 /* check if the file's owner/group is over quota */
2909 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2910 struct obd_async_page_ops *ops;
2917 ops = oap->oap_caller_ops;
2918 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2919 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2929 loi = lsm->lsm_oinfo[0];
2931 client_obd_list_lock(&cli->cl_loi_list_lock);
2934 oap->oap_page_off = off;
2935 oap->oap_count = count;
2936 oap->oap_brw_flags = brw_flags;
2937 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2938 if (libcfs_memory_pressure_get())
2939 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2940 spin_lock(&oap->oap_lock);
2941 oap->oap_async_flags = async_flags;
2942 spin_unlock(&oap->oap_lock);
2944 if (cmd & OBD_BRW_WRITE) {
2945 rc = osc_enter_cache(cli, loi, oap);
2947 client_obd_list_unlock(&cli->cl_loi_list_lock);
2952 osc_oap_to_pending(oap);
2953 loi_list_maint(cli, loi);
2955 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2958 osc_check_rpcs(cli);
2959 client_obd_list_unlock(&cli->cl_loi_list_lock);
2964 /* aka (~was & now & flag), but this is more clear :) */
2965 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2967 static int osc_set_async_flags(struct obd_export *exp,
2968 struct lov_stripe_md *lsm,
2969 struct lov_oinfo *loi, void *cookie,
2970 obd_flag async_flags)
2972 struct client_obd *cli = &exp->exp_obd->u.cli;
2973 struct loi_oap_pages *lop;
2974 struct osc_async_page *oap;
2978 oap = oap_from_cookie(cookie);
2980 RETURN(PTR_ERR(oap));
2983 * bug 7311: OST-side locking is only supported for liblustre for now
2984 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2985 * implementation has to handle case where OST-locked page was picked
2986 * up by, e.g., ->writepage().
2988 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2989 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2992 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2996 loi = lsm->lsm_oinfo[0];
2998 if (oap->oap_cmd & OBD_BRW_WRITE) {
2999 lop = &loi->loi_write_lop;
3001 lop = &loi->loi_read_lop;
3004 client_obd_list_lock(&cli->cl_loi_list_lock);
3005 /* oap_lock provides atomic semantics of oap_async_flags access */
3006 spin_lock(&oap->oap_lock);
3007 if (list_empty(&oap->oap_pending_item))
3008 GOTO(out, rc = -EINVAL);
3010 if ((oap->oap_async_flags & async_flags) == async_flags)
3013 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3014 oap->oap_async_flags |= ASYNC_READY;
3016 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3017 list_empty(&oap->oap_rpc_item)) {
3018 if (oap->oap_async_flags & ASYNC_HP)
3019 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3021 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3022 oap->oap_async_flags |= ASYNC_URGENT;
3023 loi_list_maint(cli, loi);
3026 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3027 oap->oap_async_flags);
3029 spin_unlock(&oap->oap_lock);
3030 osc_check_rpcs(cli);
3031 client_obd_list_unlock(&cli->cl_loi_list_lock);
3035 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3036 struct lov_oinfo *loi,
3037 struct obd_io_group *oig, void *cookie,
3038 int cmd, obd_off off, int count,
3040 obd_flag async_flags)
3042 struct client_obd *cli = &exp->exp_obd->u.cli;
3043 struct osc_async_page *oap;
3044 struct loi_oap_pages *lop;
3048 oap = oap_from_cookie(cookie);
3050 RETURN(PTR_ERR(oap));
3052 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3055 if (!list_empty(&oap->oap_pending_item) ||
3056 !list_empty(&oap->oap_urgent_item) ||
3057 !list_empty(&oap->oap_rpc_item))
3061 loi = lsm->lsm_oinfo[0];
3063 client_obd_list_lock(&cli->cl_loi_list_lock);
3066 oap->oap_page_off = off;
3067 oap->oap_count = count;
3068 oap->oap_brw_flags = brw_flags;
3069 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3070 if (libcfs_memory_pressure_get())
3071 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3072 spin_lock(&oap->oap_lock);
3073 oap->oap_async_flags = async_flags;
3074 spin_unlock(&oap->oap_lock);
3076 if (cmd & OBD_BRW_WRITE)
3077 lop = &loi->loi_write_lop;
3079 lop = &loi->loi_read_lop;
3081 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3082 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3084 rc = oig_add_one(oig, &oap->oap_occ);
3087 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3088 oap, oap->oap_page, rc);
3090 client_obd_list_unlock(&cli->cl_loi_list_lock);
3095 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3096 struct loi_oap_pages *lop, int cmd)
3098 struct list_head *pos, *tmp;
3099 struct osc_async_page *oap;
3101 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3102 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3103 list_del(&oap->oap_pending_item);
3104 osc_oap_to_pending(oap);
3106 loi_list_maint(cli, loi);
3109 static int osc_trigger_group_io(struct obd_export *exp,
3110 struct lov_stripe_md *lsm,
3111 struct lov_oinfo *loi,
3112 struct obd_io_group *oig)
3114 struct client_obd *cli = &exp->exp_obd->u.cli;
3118 loi = lsm->lsm_oinfo[0];
3120 client_obd_list_lock(&cli->cl_loi_list_lock);
3122 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3123 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3125 osc_check_rpcs(cli);
3126 client_obd_list_unlock(&cli->cl_loi_list_lock);
3131 static int osc_teardown_async_page(struct obd_export *exp,
3132 struct lov_stripe_md *lsm,
3133 struct lov_oinfo *loi, void *cookie)
3135 struct client_obd *cli = &exp->exp_obd->u.cli;
3136 struct loi_oap_pages *lop;
3137 struct osc_async_page *oap;
3141 oap = oap_from_cookie(cookie);
3143 RETURN(PTR_ERR(oap));
3146 loi = lsm->lsm_oinfo[0];
3148 if (oap->oap_cmd & OBD_BRW_WRITE) {
3149 lop = &loi->loi_write_lop;
3151 lop = &loi->loi_read_lop;
3154 client_obd_list_lock(&cli->cl_loi_list_lock);
3156 if (!list_empty(&oap->oap_rpc_item))
3157 GOTO(out, rc = -EBUSY);
3159 osc_exit_cache(cli, oap, 0);
3160 osc_wake_cache_waiters(cli);
3162 if (!list_empty(&oap->oap_urgent_item)) {
3163 list_del_init(&oap->oap_urgent_item);
3164 spin_lock(&oap->oap_lock);
3165 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3166 spin_unlock(&oap->oap_lock);
3169 if (!list_empty(&oap->oap_pending_item)) {
3170 list_del_init(&oap->oap_pending_item);
3171 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3173 loi_list_maint(cli, loi);
3174 cache_remove_extent(cli->cl_cache, oap);
3176 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3178 client_obd_list_unlock(&cli->cl_loi_list_lock);
3182 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3183 struct ldlm_lock_desc *new, void *data,
3186 struct lustre_handle lockh = { 0 };
3190 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3191 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3196 case LDLM_CB_BLOCKING:
3197 ldlm_lock2handle(lock, &lockh);
3198 rc = ldlm_cli_cancel(&lockh);
3200 CERROR("ldlm_cli_cancel failed: %d\n", rc);
3202 case LDLM_CB_CANCELING: {
3204 ldlm_lock2handle(lock, &lockh);
3205 /* This lock wasn't granted, don't try to do anything */
3206 if (lock->l_req_mode != lock->l_granted_mode)
3209 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3212 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3213 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3214 lock, new, data,flag);
3223 EXPORT_SYMBOL(osc_extent_blocking_cb);
3225 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3228 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3231 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3234 lock_res_and_lock(lock);
3235 #if defined (__KERNEL__) && defined (__linux__)
3236 /* Liang XXX: Darwin and Winnt checking should be added */
3237 if (lock->l_ast_data && lock->l_ast_data != data) {
3238 struct inode *new_inode = data;
3239 struct inode *old_inode = lock->l_ast_data;
3240 if (!(old_inode->i_state & I_FREEING))
3241 LDLM_ERROR(lock, "inconsistent l_ast_data found");
3242 LASSERTF(old_inode->i_state & I_FREEING,
3243 "Found existing inode %p/%lu/%u state %lu in lock: "
3244 "setting data to %p/%lu/%u\n", old_inode,
3245 old_inode->i_ino, old_inode->i_generation,
3247 new_inode, new_inode->i_ino, new_inode->i_generation);
3250 lock->l_ast_data = data;
3251 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3252 unlock_res_and_lock(lock);
3253 LDLM_LOCK_PUT(lock);
3256 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3257 ldlm_iterator_t replace, void *data)
3259 struct ldlm_res_id res_id;
3260 struct obd_device *obd = class_exp2obd(exp);
3262 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3263 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3267 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3268 struct obd_info *oinfo, int intent, int rc)
3273 /* The request was created before ldlm_cli_enqueue call. */
3274 if (rc == ELDLM_LOCK_ABORTED) {
3275 struct ldlm_reply *rep;
3277 /* swabbed by ldlm_cli_enqueue() */
3278 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3279 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3281 LASSERT(rep != NULL);
3282 if (rep->lock_policy_res1)
3283 rc = rep->lock_policy_res1;
3287 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3288 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3289 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3290 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3291 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3295 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3297 /* Call the update callback. */
3298 rc = oinfo->oi_cb_up(oinfo, rc);
3302 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3305 struct osc_enqueue_args *aa = data;
3306 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3307 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3308 struct ldlm_lock *lock;
3310 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3312 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3314 /* Complete obtaining the lock procedure. */
3315 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3317 &aa->oa_oi->oi_flags,
3318 &lsm->lsm_oinfo[0]->loi_lvb,
3319 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3320 lustre_swab_ost_lvb,
3321 aa->oa_oi->oi_lockh, rc);
3323 /* Complete osc stuff. */
3324 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3326 /* Release the lock for async request. */
3327 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3328 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3330 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3331 aa->oa_oi->oi_lockh, req, aa);
3332 LDLM_LOCK_PUT(lock);
3336 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3337 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3338 * other synchronous requests, however keeping some locks and trying to obtain
3339 * others may take a considerable amount of time in a case of ost failure; and
3340 * when other sync requests do not get released lock from a client, the client
3341 * is excluded from the cluster -- such scenarious make the life difficult, so
3342 * release locks just after they are obtained. */
3343 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3344 struct ldlm_enqueue_info *einfo,
3345 struct ptlrpc_request_set *rqset)
3347 struct ldlm_res_id res_id;
3348 struct obd_device *obd = exp->exp_obd;
3349 struct ldlm_reply *rep;
3350 struct ptlrpc_request *req = NULL;
3351 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3356 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3357 oinfo->oi_md->lsm_object_gr, &res_id);
3358 /* Filesystem lock extents are extended to page boundaries so that
3359 * dealing with the page cache is a little smoother. */
3360 oinfo->oi_policy.l_extent.start -=
3361 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3362 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3364 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3367 /* Next, search for already existing extent locks that will cover us */
3368 /* If we're trying to read, we also search for an existing PW lock. The
3369 * VFS and page cache already protect us locally, so lots of readers/
3370 * writers can share a single PW lock.
3372 * There are problems with conversion deadlocks, so instead of
3373 * converting a read lock to a write lock, we'll just enqueue a new
3376 * At some point we should cancel the read lock instead of making them
3377 * send us a blocking callback, but there are problems with canceling
3378 * locks out from other users right now, too. */
3379 mode = einfo->ei_mode;
3380 if (einfo->ei_mode == LCK_PR)
3382 mode = ldlm_lock_match(obd->obd_namespace,
3383 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3384 einfo->ei_type, &oinfo->oi_policy, mode,
3387 /* addref the lock only if not async requests and PW lock is
3388 * matched whereas we asked for PR. */
3389 if (!rqset && einfo->ei_mode != mode)
3390 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3391 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3394 /* I would like to be able to ASSERT here that rss <=
3395 * kms, but I can't, for reasons which are explained in
3399 /* We already have a lock, and it's referenced */
3400 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3402 /* For async requests, decref the lock. */
3403 if (einfo->ei_mode != mode)
3404 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3406 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3414 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3415 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
3416 [DLM_LOCKREQ_OFF + 1] = 0 };
3418 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3422 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3423 size[DLM_REPLY_REC_OFF] =
3424 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3425 ptlrpc_req_set_repsize(req, 3, size);
3428 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3429 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3431 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3432 &oinfo->oi_policy, &oinfo->oi_flags,
3433 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3434 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3435 lustre_swab_ost_lvb, oinfo->oi_lockh,
3439 struct osc_enqueue_args *aa;
3440 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3441 aa = ptlrpc_req_async_args(req);
3446 req->rq_interpret_reply = osc_enqueue_interpret;
3447 ptlrpc_set_add_req(rqset, req);
3448 } else if (intent) {
3449 ptlrpc_req_finished(req);
3454 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3456 ptlrpc_req_finished(req);
3461 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3462 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3463 int *flags, void *data, struct lustre_handle *lockh,
3466 struct ldlm_res_id res_id;
3467 struct obd_device *obd = exp->exp_obd;
3468 int lflags = *flags;
3472 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3474 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3476 /* Filesystem lock extents are extended to page boundaries so that
3477 * dealing with the page cache is a little smoother */
3478 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3479 policy->l_extent.end |= ~CFS_PAGE_MASK;
3481 /* Next, search for already existing extent locks that will cover us */
3482 /* If we're trying to read, we also search for an existing PW lock. The
3483 * VFS and page cache already protect us locally, so lots of readers/
3484 * writers can share a single PW lock. */
3488 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3489 &res_id, type, policy, rc, lockh);
3491 osc_set_data_with_check(lockh, data, lflags);
3492 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3493 ldlm_lock_addref(lockh, LCK_PR);
3494 ldlm_lock_decref(lockh, LCK_PW);
3496 if (n_matches != NULL)
3503 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3504 __u32 mode, struct lustre_handle *lockh, int flags,
3509 if (unlikely(mode == LCK_GROUP))
3510 ldlm_lock_decref_and_cancel(lockh, mode);
3512 ldlm_lock_decref(lockh, mode);
3517 static int osc_cancel_unused(struct obd_export *exp,
3518 struct lov_stripe_md *lsm, int flags, void *opaque)
3520 struct obd_device *obd = class_exp2obd(exp);
3521 struct ldlm_res_id res_id, *resp = NULL;
3524 resp = osc_build_res_name(lsm->lsm_object_id,
3525 lsm->lsm_object_gr, &res_id);
3528 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3532 static int osc_join_lru(struct obd_export *exp,
3533 struct lov_stripe_md *lsm, int join)
3535 struct obd_device *obd = class_exp2obd(exp);
3536 struct ldlm_res_id res_id, *resp = NULL;
3539 resp = osc_build_res_name(lsm->lsm_object_id,
3540 lsm->lsm_object_gr, &res_id);
3543 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3547 static int osc_statfs_interpret(struct ptlrpc_request *req,
3550 struct osc_async_args *aa = data;
3551 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3552 struct obd_statfs *msfs;
3557 /* The request has in fact never been sent
3558 * due to issues at a higher level (LOV).
3559 * Exit immediately since the caller is
3560 * aware of the problem and takes care
3561 * of the clean up */
3564 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3565 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3571 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3572 lustre_swab_obd_statfs);
3574 CERROR("Can't unpack obd_statfs\n");
3575 GOTO(out, rc = -EPROTO);
3578 /* Reinitialize the RDONLY and DEGRADED flags at the client
3579 * on each statfs, so they don't stay set permanently. */
3580 spin_lock(&cli->cl_oscc.oscc_lock);
3582 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3583 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3584 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3585 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3587 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3588 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3589 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3590 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3592 /* Add a bit of hysteresis so this flag isn't continually flapping,
3593 * and ensure that new files don't get extremely fragmented due to
3594 * only a small amount of available space in the filesystem.
3595 * We want to set the NOSPC flag when there is less than ~0.1% free
3596 * and clear it when there is at least ~0.2% free space, so:
3597 * avail < ~0.1% max max = avail + used
3598 * 1025 * avail < avail + used used = blocks - free
3599 * 1024 * avail < used
3600 * 1024 * avail < blocks - free
3601 * avail < ((blocks - free) >> 10)
3603 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3604 * lose that amount of space so in those cases we report no space left
3605 * if their is less than 1 GB left. */
3606 used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3607 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3608 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3609 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3610 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3611 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3612 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3614 spin_unlock(&cli->cl_oscc.oscc_lock);
3616 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3618 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3622 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3623 __u64 max_age, struct ptlrpc_request_set *rqset)
3625 struct ptlrpc_request *req;
3626 struct osc_async_args *aa;
3627 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3630 /* We could possibly pass max_age in the request (as an absolute
3631 * timestamp or a "seconds.usec ago") so the target can avoid doing
3632 * extra calls into the filesystem if that isn't necessary (e.g.
3633 * during mount that would help a bit). Having relative timestamps
3634 * is not so great if request processing is slow, while absolute
3635 * timestamps are not ideal because they need time synchronization. */
3636 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3637 OST_STATFS, 1, NULL, NULL);
3641 ptlrpc_req_set_repsize(req, 2, size);
3642 req->rq_request_portal = OST_CREATE_PORTAL;
3643 ptlrpc_at_set_req_timeout(req);
3644 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3645 /* procfs requests not want stat in wait for avoid deadlock */
3646 req->rq_no_resend = 1;
3647 req->rq_no_delay = 1;
3650 req->rq_interpret_reply = osc_statfs_interpret;
3651 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3652 aa = ptlrpc_req_async_args(req);
3655 ptlrpc_set_add_req(rqset, req);
3659 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3660 __u64 max_age, __u32 flags)
3662 struct obd_statfs *msfs;
3663 struct ptlrpc_request *req;
3664 struct obd_import *imp = NULL;
3665 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3669 /*Since the request might also come from lprocfs, so we need
3670 *sync this with client_disconnect_export Bug15684*/
3671 down_read(&obd->u.cli.cl_sem);
3672 if (obd->u.cli.cl_import)
3673 imp = class_import_get(obd->u.cli.cl_import);
3674 up_read(&obd->u.cli.cl_sem);
3678 /* We could possibly pass max_age in the request (as an absolute
3679 * timestamp or a "seconds.usec ago") so the target can avoid doing
3680 * extra calls into the filesystem if that isn't necessary (e.g.
3681 * during mount that would help a bit). Having relative timestamps
3682 * is not so great if request processing is slow, while absolute
3683 * timestamps are not ideal because they need time synchronization. */
3684 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3685 OST_STATFS, 1, NULL, NULL);
3687 class_import_put(imp);
3691 ptlrpc_req_set_repsize(req, 2, size);
3692 req->rq_request_portal = OST_CREATE_PORTAL;
3693 ptlrpc_at_set_req_timeout(req);
3695 if (flags & OBD_STATFS_NODELAY) {
3696 /* procfs requests not want stat in wait for avoid deadlock */
3697 req->rq_no_resend = 1;
3698 req->rq_no_delay = 1;
3701 rc = ptlrpc_queue_wait(req);
3705 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3706 lustre_swab_obd_statfs);
3708 CERROR("Can't unpack obd_statfs\n");
3709 GOTO(out, rc = -EPROTO);
3712 memcpy(osfs, msfs, sizeof(*osfs));
3716 ptlrpc_req_finished(req);
3720 /* Retrieve object striping information.
3722 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3723 * the maximum number of OST indices which will fit in the user buffer.
3724 * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3726 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3728 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3729 struct lov_user_md_v3 lum, *lumk;
3730 int rc = 0, lum_size;
3731 struct lov_user_ost_data_v1 *lmm_objects;
3737 /* we only need the header part from user space to get lmm_magic and
3738 * lmm_stripe_count, (the header part is common to v1 and v3) */
3739 lum_size = sizeof(struct lov_user_md_v1);
3740 memset(&lum, 0x00, sizeof(lum));
3741 if (copy_from_user(&lum, lump, lum_size))
3744 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3745 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3748 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3749 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3750 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3751 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3753 /* we can use lov_mds_md_size() to compute lum_size
3754 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3755 if (lum.lmm_stripe_count > 0) {
3756 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3757 OBD_ALLOC(lumk, lum_size);
3760 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3761 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3763 lmm_objects = &(lumk->lmm_objects[0]);
3764 lmm_objects->l_object_id = lsm->lsm_object_id;
3766 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3770 lumk->lmm_magic = lum.lmm_magic;
3771 lumk->lmm_stripe_count = 1;
3772 lumk->lmm_object_id = lsm->lsm_object_id;
3774 if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3775 (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3776 /* lsm not in host order, so count also need be in same order */
3777 __swab32s(&lumk->lmm_magic);
3778 __swab16s(&lumk->lmm_stripe_count);
3779 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3780 if (lum.lmm_stripe_count > 0)
3781 lustre_swab_lov_user_md_objects(
3782 (struct lov_user_md_v1*)lumk);
3785 if (copy_to_user(lump, lumk, lum_size))
3789 OBD_FREE(lumk, lum_size);
3795 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3796 void *karg, void *uarg)
3798 struct obd_device *obd = exp->exp_obd;
3799 struct obd_ioctl_data *data = karg;
3803 if (!try_module_get(THIS_MODULE)) {
3804 CERROR("Can't get module. Is it alive?");
3808 case OBD_IOC_LOV_GET_CONFIG: {
3810 struct lov_desc *desc;
3811 struct obd_uuid uuid;
3815 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3816 GOTO(out, err = -EINVAL);
3818 data = (struct obd_ioctl_data *)buf;
3820 if (sizeof(*desc) > data->ioc_inllen1) {
3821 obd_ioctl_freedata(buf, len);
3822 GOTO(out, err = -EINVAL);
3825 if (data->ioc_inllen2 < sizeof(uuid)) {
3826 obd_ioctl_freedata(buf, len);
3827 GOTO(out, err = -EINVAL);
3830 desc = (struct lov_desc *)data->ioc_inlbuf1;
3831 desc->ld_tgt_count = 1;
3832 desc->ld_active_tgt_count = 1;
3833 desc->ld_default_stripe_count = 1;
3834 desc->ld_default_stripe_size = 0;
3835 desc->ld_default_stripe_offset = 0;
3836 desc->ld_pattern = 0;
3837 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3839 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3841 err = copy_to_user((void *)uarg, buf, len);
3844 obd_ioctl_freedata(buf, len);
3847 case LL_IOC_LOV_SETSTRIPE:
3848 err = obd_alloc_memmd(exp, karg);
3852 case LL_IOC_LOV_GETSTRIPE:
3853 err = osc_getstripe(karg, uarg);
3855 case OBD_IOC_CLIENT_RECOVER:
3856 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3861 case IOC_OSC_SET_ACTIVE:
3862 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3865 case OBD_IOC_POLL_QUOTACHECK:
3866 err = lquota_poll_check(quota_interface, exp,
3867 (struct if_quotacheck *)karg);
3869 case OBD_IOC_DESTROY: {
3872 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3873 GOTO (out, err = -EPERM);
3874 oa = &data->ioc_obdo1;
3877 GOTO(out, err = -EINVAL);
3879 oa->o_valid |= OBD_MD_FLGROUP;
3881 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3884 case OBD_IOC_PING_TARGET:
3885 err = ptlrpc_obd_ping(obd);
3888 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3889 cmd, cfs_curproc_comm());
3890 GOTO(out, err = -ENOTTY);
3893 module_put(THIS_MODULE);
3897 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3898 void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3901 if (!vallen || !val)
3904 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3905 __u32 *stripe = val;
3906 *vallen = sizeof(*stripe);
3909 } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
3910 struct client_obd *cli = &exp->exp_obd->u.cli;
3911 __u64 *rpcsize = val;
3912 LASSERT(*vallen == sizeof(__u64));
3913 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
3915 } else if (KEY_IS(KEY_LAST_ID)) {
3916 struct ptlrpc_request *req;
3918 char *bufs[2] = { NULL, key };
3919 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3922 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3923 OST_GET_INFO, 2, size, bufs);
3927 size[REPLY_REC_OFF] = *vallen;
3928 ptlrpc_req_set_repsize(req, 2, size);
3929 rc = ptlrpc_queue_wait(req);
3933 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3934 lustre_swab_ost_last_id);
3935 if (reply == NULL) {
3936 CERROR("Can't unpack OST last ID\n");
3937 GOTO(out, rc = -EPROTO);
3939 *((obd_id *)val) = *reply;
3941 ptlrpc_req_finished(req);
3943 } else if (KEY_IS(KEY_FIEMAP)) {
3944 struct ptlrpc_request *req;
3945 struct ll_user_fiemap *reply;
3946 char *bufs[2] = { NULL, key };
3947 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3950 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3951 OST_GET_INFO, 2, size, bufs);
3955 size[REPLY_REC_OFF] = *vallen;
3956 ptlrpc_req_set_repsize(req, 2, size);
3958 rc = ptlrpc_queue_wait(req);
3961 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3962 lustre_swab_fiemap);
3963 if (reply == NULL) {
3964 CERROR("Can't unpack FIEMAP reply.\n");
3965 GOTO(out1, rc = -EPROTO);
3968 memcpy(val, reply, *vallen);
3971 ptlrpc_req_finished(req);
3979 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3982 struct llog_ctxt *ctxt;
3983 struct obd_import *imp = req->rq_import;
3989 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3992 rc = llog_initiator_connect(ctxt);
3994 CERROR("cannot establish connection for "
3995 "ctxt %p: %d\n", ctxt, rc);
3998 llog_ctxt_put(ctxt);
3999 spin_lock(&imp->imp_lock);
4000 imp->imp_server_timeout = 1;
4001 imp->imp_pingable = 1;
4002 spin_unlock(&imp->imp_lock);
4003 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4008 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4009 void *key, obd_count vallen, void *val,
4010 struct ptlrpc_request_set *set)
4012 struct ptlrpc_request *req;
4013 struct obd_device *obd = exp->exp_obd;
4014 struct obd_import *imp = class_exp2cliimp(exp);
4015 __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
4016 char *bufs[3] = { NULL, key, val };
4019 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4021 if (KEY_IS(KEY_NEXT_ID)) {
4023 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4025 if (vallen != sizeof(obd_id))
4028 /* avoid race between allocate new object and set next id
4029 * from ll_sync thread */
4030 spin_lock(&oscc->oscc_lock);
4031 new_val = *((obd_id*)val) + 1;
4032 if (new_val > oscc->oscc_next_id)
4033 oscc->oscc_next_id = new_val;
4034 spin_unlock(&oscc->oscc_lock);
4036 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4037 exp->exp_obd->obd_name,
4038 oscc->oscc_next_id);
4043 if (KEY_IS(KEY_INIT_RECOV)) {
4044 if (vallen != sizeof(int))
4046 spin_lock(&imp->imp_lock);
4047 imp->imp_initial_recov = *(int *)val;
4048 spin_unlock(&imp->imp_lock);
4049 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4050 exp->exp_obd->obd_name,
4051 imp->imp_initial_recov);
4055 if (KEY_IS(KEY_CHECKSUM)) {
4056 if (vallen != sizeof(int))
4058 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4062 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4065 /* We pass all other commands directly to OST. Since nobody calls osc
4066 methods directly and everybody is supposed to go through LOV, we
4067 assume lov checked invalid values for us.
4068 The only recognised values so far are evict_by_nid and mds_conn.
4069 Even if something bad goes through, we'd get a -EINVAL from OST
4072 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4077 if (KEY_IS(KEY_MDS_CONN))
4078 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4079 else if (KEY_IS(KEY_GRANT_SHRINK))
4080 req->rq_interpret_reply = osc_shrink_grant_interpret;
4082 if (KEY_IS(KEY_GRANT_SHRINK)) {
4083 struct osc_grant_args *aa;
4086 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4087 aa = ptlrpc_req_async_args(req);
4090 ptlrpc_req_finished(req);
4093 *oa = ((struct ost_body *)val)->oa;
4097 ptlrpc_req_set_repsize(req, 2, size);
4098 ptlrpcd_add_req(req);
4100 ptlrpc_req_set_repsize(req, 1, NULL);
4101 ptlrpc_set_add_req(set, req);
4102 ptlrpc_check_set(set);
4109 static struct llog_operations osc_size_repl_logops = {
4110 lop_cancel: llog_obd_repl_cancel
4113 static struct llog_operations osc_mds_ost_orig_logops;
4114 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4117 struct llog_catid catid;
4118 static char name[32] = CATLIST;
4124 mutex_down(&disk_obd->obd_llog_cat_process);
4126 rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4128 CERROR("rc: %d\n", rc);
4129 GOTO(out_unlock, rc);
4132 CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4133 obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4134 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4137 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4138 &catid.lci_logid, &osc_mds_ost_orig_logops);
4140 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4144 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4145 &osc_size_repl_logops);
4147 struct llog_ctxt *ctxt =
4148 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4151 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4155 CERROR("osc '%s' tgt '%s' rc=%d\n",
4156 obd->obd_name, disk_obd->obd_name, rc);
4157 CERROR("logid "LPX64":0x%x\n",
4158 catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4160 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4163 CERROR("rc: %d\n", rc);
4166 mutex_up(&disk_obd->obd_llog_cat_process);
4171 static int osc_llog_finish(struct obd_device *obd, int count)
4173 struct llog_ctxt *ctxt;
4174 int rc = 0, rc2 = 0;
4177 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4179 rc = llog_cleanup(ctxt);
4181 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4183 rc2 = llog_cleanup(ctxt);
4190 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4191 struct obd_uuid *cluuid,
4192 struct obd_connect_data *data,
4195 struct client_obd *cli = &obd->u.cli;
4197 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4200 client_obd_list_lock(&cli->cl_loi_list_lock);
4201 data->ocd_grant = cli->cl_avail_grant ?:
4202 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4203 lost_grant = cli->cl_lost_grant;
4204 cli->cl_lost_grant = 0;
4205 client_obd_list_unlock(&cli->cl_loi_list_lock);
4207 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4208 "cl_lost_grant: %ld\n", data->ocd_grant,
4209 cli->cl_avail_grant, lost_grant);
4210 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4211 " ocd_grant: %d\n", data->ocd_connect_flags,
4212 data->ocd_version, data->ocd_grant);
4218 static int osc_disconnect(struct obd_export *exp)
4220 struct obd_device *obd = class_exp2obd(exp);
4221 struct llog_ctxt *ctxt;
4224 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4226 if (obd->u.cli.cl_conn_count == 1) {
4227 /* Flush any remaining cancel messages out to the
4229 llog_sync(ctxt, exp);
4231 llog_ctxt_put(ctxt);
4233 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4237 rc = client_disconnect_export(exp);
4239 * Initially we put del_shrink_grant before disconnect_export, but it
4240 * causes the following problem if setup (connect) and cleanup
4241 * (disconnect) are tangled together.
4242 * connect p1 disconnect p2
4243 * ptlrpc_connect_import
4244 * ............... class_manual_cleanup
4247 * ptlrpc_connect_interrupt
4249 * add this client to shrink list
4251 * Bang! pinger trigger the shrink.
4252 * So the osc should be disconnected from the shrink list, after we
4253 * are sure the import has been destroyed. BUG18662
4255 if (obd->u.cli.cl_import == NULL)
4256 osc_del_shrink_grant(&obd->u.cli);
4260 static int osc_import_event(struct obd_device *obd,
4261 struct obd_import *imp,
4262 enum obd_import_event event)
4264 struct client_obd *cli;
4268 LASSERT(imp->imp_obd == obd);
4271 case IMP_EVENT_DISCON: {
4272 /* Only do this on the MDS OSC's */
4273 if (imp->imp_server_timeout) {
4274 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4276 spin_lock(&oscc->oscc_lock);
4277 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4278 spin_unlock(&oscc->oscc_lock);
4281 client_obd_list_lock(&cli->cl_loi_list_lock);
4282 cli->cl_avail_grant = 0;
4283 cli->cl_lost_grant = 0;
4284 client_obd_list_unlock(&cli->cl_loi_list_lock);
4285 ptlrpc_import_setasync(imp, -1);
4289 case IMP_EVENT_INACTIVE: {
4290 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4293 case IMP_EVENT_INVALIDATE: {
4294 struct ldlm_namespace *ns = obd->obd_namespace;
4298 client_obd_list_lock(&cli->cl_loi_list_lock);
4299 /* all pages go to failing rpcs due to the invalid import */
4300 osc_check_rpcs(cli);
4301 client_obd_list_unlock(&cli->cl_loi_list_lock);
4303 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4307 case IMP_EVENT_ACTIVE: {
4308 /* Only do this on the MDS OSC's */
4309 if (imp->imp_server_timeout) {
4310 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4312 spin_lock(&oscc->oscc_lock);
4313 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4314 spin_unlock(&oscc->oscc_lock);
4316 CDEBUG(D_INFO, "notify server \n");
4317 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4320 case IMP_EVENT_OCD: {
4321 struct obd_connect_data *ocd = &imp->imp_connect_data;
4323 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4324 osc_init_grant(&obd->u.cli, ocd);
4327 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4328 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4330 ptlrpc_import_setasync(imp, 1);
4331 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4335 CERROR("Unknown import event %d\n", event);
4341 /* determine whether the lock can be canceled before replaying the lock
4342 * during recovery, see bug16774 for detailed information
4345 * zero - the lock can't be canceled
4346 * other - ok to cancel
4348 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4350 check_res_locked(lock->l_resource);
4351 if (lock->l_granted_mode == LCK_GROUP ||
4352 lock->l_resource->lr_type != LDLM_EXTENT)
4355 /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
4356 if (lock->l_granted_mode == LCK_PR ||
4357 lock->l_granted_mode == LCK_CR)
4363 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4369 rc = ptlrpcd_addref();
4373 rc = client_obd_setup(obd, len, buf);
4377 struct lprocfs_static_vars lvars = { 0 };
4378 struct client_obd *cli = &obd->u.cli;
4380 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4381 lprocfs_osc_init_vars(&lvars);
4382 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4383 lproc_osc_attach_seqstat(obd);
4384 ptlrpc_lprocfs_register_obd(obd);
4388 /* We need to allocate a few requests more, because
4389 brw_interpret tries to create new requests before freeing
4390 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4391 reserved, but I afraid that might be too much wasted RAM
4392 in fact, so 2 is just my guess and still should work. */
4393 cli->cl_import->imp_rq_pool =
4394 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4396 ptlrpc_add_rqs_to_pool);
4397 cli->cl_cache = cache_create(obd);
4398 if (!cli->cl_cache) {
4402 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4403 sema_init(&cli->cl_grant_sem, 1);
4405 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4411 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4417 case OBD_CLEANUP_EARLY: {
4418 struct obd_import *imp;
4419 imp = obd->u.cli.cl_import;
4420 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4421 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4422 ptlrpc_deactivate_import(imp);
4425 case OBD_CLEANUP_EXPORTS: {
4426 /* If we set up but never connected, the
4427 client import will not have been cleaned. */
4428 down_write(&obd->u.cli.cl_sem);
4429 if (obd->u.cli.cl_import) {
4430 struct obd_import *imp;
4431 imp = obd->u.cli.cl_import;
4432 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4434 ptlrpc_invalidate_import(imp);
4435 if (imp->imp_rq_pool) {
4436 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4437 imp->imp_rq_pool = NULL;
4439 class_destroy_import(imp);
4440 obd->u.cli.cl_import = NULL;
4442 up_write(&obd->u.cli.cl_sem);
4444 rc = obd_llog_finish(obd, 0);
4446 CERROR("failed to cleanup llogging subsystems\n");
4449 case OBD_CLEANUP_SELF_EXP:
4451 case OBD_CLEANUP_OBD:
4457 int osc_cleanup(struct obd_device *obd)
4462 ptlrpc_lprocfs_unregister_obd(obd);
4463 lprocfs_obd_cleanup(obd);
4465 /* free memory of osc quota cache */
4466 lquota_cleanup(quota_interface, obd);
4468 cache_destroy(obd->u.cli.cl_cache);
4469 rc = client_obd_cleanup(obd);
4475 static int osc_register_page_removal_cb(struct obd_device *obd,
4476 obd_page_removal_cb_t func,
4477 obd_pin_extent_cb pin_cb)
4481 /* this server - not need init */
4485 return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4489 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4490 obd_page_removal_cb_t func)
4493 return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4496 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4497 obd_lock_cancel_cb cb)
4500 LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4502 /* this server - not need init */
4506 obd->u.cli.cl_ext_lock_cancel_cb = cb;
4510 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4511 obd_lock_cancel_cb cb)
4515 if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4516 CERROR("Unregistering cancel cb %p, while only %p was "
4518 obd->u.cli.cl_ext_lock_cancel_cb);
4522 obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4526 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4528 struct lustre_cfg *lcfg = buf;
4529 struct lprocfs_static_vars lvars = { 0 };
4532 lprocfs_osc_init_vars(&lvars);
4534 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4538 struct obd_ops osc_obd_ops = {
4539 .o_owner = THIS_MODULE,
4540 .o_setup = osc_setup,
4541 .o_precleanup = osc_precleanup,
4542 .o_cleanup = osc_cleanup,
4543 .o_add_conn = client_import_add_conn,
4544 .o_del_conn = client_import_del_conn,
4545 .o_connect = client_connect_import,
4546 .o_reconnect = osc_reconnect,
4547 .o_disconnect = osc_disconnect,
4548 .o_statfs = osc_statfs,
4549 .o_statfs_async = osc_statfs_async,
4550 .o_packmd = osc_packmd,
4551 .o_unpackmd = osc_unpackmd,
4552 .o_precreate = osc_precreate,
4553 .o_create = osc_create,
4554 .o_create_async = osc_create_async,
4555 .o_destroy = osc_destroy,
4556 .o_getattr = osc_getattr,
4557 .o_getattr_async = osc_getattr_async,
4558 .o_setattr = osc_setattr,
4559 .o_setattr_async = osc_setattr_async,
4561 .o_brw_async = osc_brw_async,
4562 .o_prep_async_page = osc_prep_async_page,
4563 .o_get_lock = osc_get_lock,
4564 .o_queue_async_io = osc_queue_async_io,
4565 .o_set_async_flags = osc_set_async_flags,
4566 .o_queue_group_io = osc_queue_group_io,
4567 .o_trigger_group_io = osc_trigger_group_io,
4568 .o_teardown_async_page = osc_teardown_async_page,
4569 .o_punch = osc_punch,
4571 .o_enqueue = osc_enqueue,
4572 .o_match = osc_match,
4573 .o_change_cbdata = osc_change_cbdata,
4574 .o_cancel = osc_cancel,
4575 .o_cancel_unused = osc_cancel_unused,
4576 .o_join_lru = osc_join_lru,
4577 .o_iocontrol = osc_iocontrol,
4578 .o_get_info = osc_get_info,
4579 .o_set_info_async = osc_set_info_async,
4580 .o_import_event = osc_import_event,
4581 .o_llog_init = osc_llog_init,
4582 .o_llog_finish = osc_llog_finish,
4583 .o_process_config = osc_process_config,
4584 .o_register_page_removal_cb = osc_register_page_removal_cb,
4585 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4586 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4587 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4589 int __init osc_init(void)
4591 struct lprocfs_static_vars lvars = { 0 };
4595 lprocfs_osc_init_vars(&lvars);
4597 request_module("lquota");
4598 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4599 lquota_init(quota_interface);
4600 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4602 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4605 if (quota_interface)
4606 PORTAL_SYMBOL_PUT(osc_quota_interface);
4610 osc_mds_ost_orig_logops = llog_lvfs_ops;
4611 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4612 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4613 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4614 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4620 static void /*__exit*/ osc_exit(void)
4622 lquota_exit(quota_interface);
4623 if (quota_interface)
4624 PORTAL_SYMBOL_PUT(osc_quota_interface);
4626 class_unregister_type(LUSTRE_OSC_NAME);
4629 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4630 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4631 MODULE_LICENSE("GPL");
4633 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);