1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
78 atomic_t osc_resend_time;
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82 struct lov_stripe_md *lsm)
87 lmm_size = sizeof(**lmmp);
92 OBD_FREE(*lmmp, lmm_size);
98 OBD_ALLOC(*lmmp, lmm_size);
104 LASSERT(lsm->lsm_object_id);
105 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113 struct lov_mds_md *lmm, int lmm_bytes)
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == 0) {
127 CERROR("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
136 if (*lsmp != NULL && lmm == NULL) {
137 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 OBD_FREE(*lsmp, lsm_size);
144 OBD_ALLOC(*lsmp, lsm_size);
147 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149 OBD_FREE(*lsmp, lsm_size);
152 loi_init((*lsmp)->lsm_oinfo[0]);
156 /* XXX zero *lsmp? */
157 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158 LASSERT((*lsmp)->lsm_object_id);
161 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
169 struct ost_body *body;
170 struct osc_async_args *aa = data;
176 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
177 lustre_swab_ost_body);
179 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
182 /* This should really be sent by the OST */
183 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
184 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
186 CERROR("can't unpack ost_body\n");
188 aa->aa_oi->oi_oa->o_valid = 0;
191 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
195 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
196 struct ptlrpc_request_set *set)
198 struct ptlrpc_request *req;
199 struct ost_body *body;
200 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
201 struct osc_async_args *aa;
204 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
205 OST_GETATTR, 2, size,NULL);
209 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
210 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
212 ptlrpc_req_set_repsize(req, 2, size);
213 req->rq_interpret_reply = osc_getattr_interpret;
215 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
216 aa = ptlrpc_req_async_args(req);
219 ptlrpc_set_add_req(set, req);
223 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
225 struct ptlrpc_request *req;
226 struct ost_body *body;
227 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
231 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
232 OST_GETATTR, 2, size, NULL);
236 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
237 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
239 ptlrpc_req_set_repsize(req, 2, size);
241 rc = ptlrpc_queue_wait(req);
243 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
247 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
248 lustre_swab_ost_body);
250 CERROR ("can't unpack ost_body\n");
251 GOTO (out, rc = -EPROTO);
254 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
255 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
257 /* This should really be sent by the OST */
258 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
259 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
263 ptlrpc_req_finished(req);
267 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
268 struct obd_trans_info *oti)
270 struct ptlrpc_request *req;
271 struct ost_body *body;
272 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
276 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
277 OST_SETATTR, 2, size, NULL);
281 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
282 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
284 ptlrpc_req_set_repsize(req, 2, size);
286 rc = ptlrpc_queue_wait(req);
290 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
291 lustre_swab_ost_body);
293 GOTO(out, rc = -EPROTO);
295 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
299 ptlrpc_req_finished(req);
303 static int osc_setattr_interpret(struct ptlrpc_request *req,
306 struct ost_body *body;
307 struct osc_async_args *aa = data;
313 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
314 lustre_swab_ost_body);
316 CERROR("can't unpack ost_body\n");
317 GOTO(out, rc = -EPROTO);
320 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
322 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
326 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
327 struct obd_trans_info *oti,
328 struct ptlrpc_request_set *rqset)
330 struct ptlrpc_request *req;
331 struct ost_body *body;
332 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
334 struct osc_async_args *aa;
337 if (osc_exp_is_2_0_server(exp)) {
341 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
342 OST_SETATTR, bufcount, size, NULL);
346 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
348 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
350 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
353 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
354 ptlrpc_req_set_repsize(req, 2, size);
355 /* do mds to ost setattr asynchronouly */
357 /* Do not wait for response. */
358 ptlrpcd_add_req(req);
360 req->rq_interpret_reply = osc_setattr_interpret;
362 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
363 aa = ptlrpc_req_async_args(req);
366 ptlrpc_set_add_req(rqset, req);
372 int osc_real_create(struct obd_export *exp, struct obdo *oa,
373 struct lov_stripe_md **ea, struct obd_trans_info *oti)
375 struct ptlrpc_request *req;
376 struct ost_body *body;
377 struct lov_stripe_md *lsm;
378 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
387 rc = obd_alloc_memmd(exp, &lsm);
392 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
393 OST_CREATE, 2, size, NULL);
395 GOTO(out, rc = -ENOMEM);
397 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
398 lustre_set_wire_obdo(&body->oa, oa);
400 ptlrpc_req_set_repsize(req, 2, size);
401 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
402 oa->o_flags == OBD_FL_DELORPHAN) {
404 "delorphan from OST integration");
405 /* Don't resend the delorphan req */
406 req->rq_no_resend = req->rq_no_delay = 1;
409 rc = ptlrpc_queue_wait(req);
413 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
414 lustre_swab_ost_body);
416 CERROR ("can't unpack ost_body\n");
417 GOTO (out_req, rc = -EPROTO);
420 lustre_get_wire_obdo(oa, &body->oa);
422 /* This should really be sent by the OST */
423 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
424 oa->o_valid |= OBD_MD_FLBLKSZ;
426 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
427 * have valid lsm_oinfo data structs, so don't go touching that.
428 * This needs to be fixed in a big way.
430 lsm->lsm_object_id = oa->o_id;
434 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
436 if (oa->o_valid & OBD_MD_FLCOOKIE) {
437 if (!oti->oti_logcookies)
438 oti_alloc_cookies(oti, 1);
439 *oti->oti_logcookies = oa->o_lcookie;
443 CDEBUG(D_HA, "transno: "LPD64"\n",
444 lustre_msg_get_transno(req->rq_repmsg));
446 ptlrpc_req_finished(req);
449 obd_free_memmd(exp, &lsm);
453 static int osc_punch_interpret(struct ptlrpc_request *req,
456 struct ost_body *body;
457 struct osc_async_args *aa = data;
463 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
464 lustre_swab_ost_body);
466 CERROR ("can't unpack ost_body\n");
467 GOTO(out, rc = -EPROTO);
470 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
472 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
476 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
477 struct obd_trans_info *oti,
478 struct ptlrpc_request_set *rqset)
480 struct ptlrpc_request *req;
481 struct osc_async_args *aa;
482 struct ost_body *body;
483 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
492 OST_PUNCH, 2, size, NULL);
496 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
497 ptlrpc_at_set_req_timeout(req);
499 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
500 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
502 /* overload the size and blocks fields in the oa with start/end */
503 body->oa.o_size = oinfo->oi_policy.l_extent.start;
504 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
505 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
507 ptlrpc_req_set_repsize(req, 2, size);
509 req->rq_interpret_reply = osc_punch_interpret;
510 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
511 aa = ptlrpc_req_async_args(req);
513 ptlrpc_set_add_req(rqset, req);
518 static int osc_sync_interpret(struct ptlrpc_request *req,
521 struct ost_body *body;
522 struct osc_async_args *aa = data;
528 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529 lustre_swab_ost_body);
531 CERROR ("can't unpack ost_body\n");
532 GOTO(out, rc = -EPROTO);
535 *aa->aa_oi->oi_oa = body->oa;
537 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
541 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
542 obd_size start, obd_size end,
543 struct ptlrpc_request_set *set)
545 struct ptlrpc_request *req;
546 struct ost_body *body;
547 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
548 struct osc_async_args *aa;
556 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
557 OST_SYNC, 2, size, NULL);
561 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
562 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
564 /* overload the size and blocks fields in the oa with start/end */
565 body->oa.o_size = start;
566 body->oa.o_blocks = end;
567 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
569 ptlrpc_req_set_repsize(req, 2, size);
570 req->rq_interpret_reply = osc_sync_interpret;
572 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
573 aa = ptlrpc_req_async_args(req);
576 ptlrpc_set_add_req(set, req);
580 /* Find and cancel locally locks matched by @mode in the resource found by
581 * @objid. Found locks are added into @cancel list. Returns the amount of
582 * locks added to @cancels list. */
583 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
584 struct list_head *cancels, ldlm_mode_t mode,
587 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
588 struct ldlm_res_id res_id;
589 struct ldlm_resource *res;
593 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
594 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
598 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
599 lock_flags, 0, NULL);
600 ldlm_resource_putref(res);
604 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
607 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
609 atomic_dec(&cli->cl_destroy_in_flight);
610 cfs_waitq_signal(&cli->cl_destroy_waitq);
614 static int osc_can_send_destroy(struct client_obd *cli)
616 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
617 cli->cl_max_rpcs_in_flight) {
618 /* The destroy request can be sent */
621 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
622 cli->cl_max_rpcs_in_flight) {
624 * The counter has been modified between the two atomic
627 cfs_waitq_signal(&cli->cl_destroy_waitq);
632 /* Destroy requests can be async always on the client, and we don't even really
633 * care about the return code since the client cannot do anything at all about
635 * When the MDS is unlinking a filename, it saves the file objects into a
636 * recovery llog, and these object records are cancelled when the OST reports
637 * they were destroyed and sync'd to disk (i.e. transaction committed).
638 * If the client dies, or the OST is down when the object should be destroyed,
639 * the records are not cancelled, and when the OST reconnects to the MDS next,
640 * it will retrieve the llog unlink logs and then sends the log cancellation
641 * cookies to the MDS after committing destroy transactions. */
642 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
643 struct lov_stripe_md *ea, struct obd_trans_info *oti,
644 struct obd_export *md_export)
646 CFS_LIST_HEAD(cancels);
647 struct ptlrpc_request *req;
648 struct ost_body *body;
649 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
650 sizeof(struct ldlm_request) };
651 int count, bufcount = 2;
652 struct client_obd *cli = &exp->exp_obd->u.cli;
660 LASSERT(oa->o_id != 0);
662 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
663 LDLM_FL_DISCARD_DATA);
664 if (exp_connect_cancelset(exp))
666 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
667 size, REQ_REC_OFF + 1, 0, &cancels, count);
671 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
672 ptlrpc_at_set_req_timeout(req);
674 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
676 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
677 oa->o_lcookie = *oti->oti_logcookies;
680 lustre_set_wire_obdo(&body->oa, oa);
681 ptlrpc_req_set_repsize(req, 2, size);
683 /* don't throttle destroy RPCs for the MDT */
684 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
685 req->rq_interpret_reply = osc_destroy_interpret;
686 if (!osc_can_send_destroy(cli)) {
687 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
691 * Wait until the number of on-going destroy RPCs drops
692 * under max_rpc_in_flight
694 l_wait_event_exclusive(cli->cl_destroy_waitq,
695 osc_can_send_destroy(cli), &lwi);
699 /* Do not wait for response */
700 ptlrpcd_add_req(req);
704 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
707 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
709 LASSERT(!(oa->o_valid & bits));
712 client_obd_list_lock(&cli->cl_loi_list_lock);
713 oa->o_dirty = cli->cl_dirty;
714 if (cli->cl_dirty > cli->cl_dirty_max) {
715 CERROR("dirty %lu > dirty_max %lu\n",
716 cli->cl_dirty, cli->cl_dirty_max);
718 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages + 1) {
719 /* The atomic_read() allowing the atomic_inc() are not covered
720 * by a lock thus they may safely race and trip this CERROR()
721 * unless we add in a small fudge factor (+1). */
722 CERROR("dirty %d > system dirty_max %d\n",
723 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
725 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
726 CERROR("dirty %lu - dirty_max %lu too big???\n",
727 cli->cl_dirty, cli->cl_dirty_max);
730 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
731 (cli->cl_max_rpcs_in_flight + 1);
732 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
734 oa->o_grant = cli->cl_avail_grant;
735 oa->o_dropped = cli->cl_lost_grant;
736 cli->cl_lost_grant = 0;
737 client_obd_list_unlock(&cli->cl_loi_list_lock);
738 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
739 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
743 static void osc_update_next_shrink(struct client_obd *cli)
745 cli->cl_next_shrink_grant =
746 cfs_time_shift(cli->cl_grant_shrink_interval);
747 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
748 cli->cl_next_shrink_grant);
751 /* caller must hold loi_list_lock */
752 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
754 atomic_inc(&obd_dirty_pages);
755 cli->cl_dirty += CFS_PAGE_SIZE;
756 cli->cl_avail_grant -= CFS_PAGE_SIZE;
757 pga->flag |= OBD_BRW_FROM_GRANT;
758 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
759 CFS_PAGE_SIZE, pga, pga->pg);
760 LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
761 cli->cl_avail_grant);
762 osc_update_next_shrink(cli);
765 /* the companion to osc_consume_write_grant, called when a brw has completed.
766 * must be called with the loi lock held. */
767 static void osc_release_write_grant(struct client_obd *cli,
768 struct brw_page *pga, int sent)
770 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
773 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
778 pga->flag &= ~OBD_BRW_FROM_GRANT;
779 atomic_dec(&obd_dirty_pages);
780 cli->cl_dirty -= CFS_PAGE_SIZE;
782 cli->cl_lost_grant += CFS_PAGE_SIZE;
783 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
784 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
785 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
786 /* For short writes we shouldn't count parts of pages that
787 * span a whole block on the OST side, or our accounting goes
788 * wrong. Should match the code in filter_grant_check. */
789 int offset = pga->off & ~CFS_PAGE_MASK;
790 int count = pga->count + (offset & (blocksize - 1));
791 int end = (offset + pga->count) & (blocksize - 1);
793 count += blocksize - end;
795 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
796 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
797 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
798 cli->cl_avail_grant, cli->cl_dirty);
804 static unsigned long rpcs_in_flight(struct client_obd *cli)
806 return cli->cl_r_in_flight + cli->cl_w_in_flight;
809 /* caller must hold loi_list_lock */
810 void osc_wake_cache_waiters(struct client_obd *cli)
812 struct list_head *l, *tmp;
813 struct osc_cache_waiter *ocw;
816 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
817 /* if we can't dirty more, we must wait until some is written */
818 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
819 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
820 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
821 "osc max %ld, sys max %d\n", cli->cl_dirty,
822 cli->cl_dirty_max, obd_max_dirty_pages);
826 /* if still dirty cache but no grant wait for pending RPCs that
827 * may yet return us some grant before doing sync writes */
828 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
829 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
830 cli->cl_w_in_flight);
834 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
835 list_del_init(&ocw->ocw_entry);
836 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
837 /* no more RPCs in flight to return grant, do sync IO */
838 ocw->ocw_rc = -EDQUOT;
839 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
841 osc_consume_write_grant(cli,
842 &ocw->ocw_oap->oap_brw_page);
845 cfs_waitq_signal(&ocw->ocw_waitq);
851 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
853 client_obd_list_lock(&cli->cl_loi_list_lock);
854 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
855 if (body->oa.o_valid & OBD_MD_FLGRANT)
856 cli->cl_avail_grant += body->oa.o_grant;
857 /* waiters are woken in brw_interpret */
858 client_obd_list_unlock(&cli->cl_loi_list_lock);
861 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
862 void *key, obd_count vallen, void *val,
863 struct ptlrpc_request_set *set);
865 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
868 struct osc_grant_args *aa = data;
869 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870 struct obdo *oa = aa->aa_oa;
871 struct ost_body *body;
874 client_obd_list_lock(&cli->cl_loi_list_lock);
875 cli->cl_avail_grant += oa->o_grant;
876 client_obd_list_unlock(&cli->cl_loi_list_lock);
879 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
880 lustre_swab_ost_body);
881 osc_update_grant(cli, body);
887 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
889 client_obd_list_lock(&cli->cl_loi_list_lock);
890 oa->o_grant = cli->cl_avail_grant / 4;
891 cli->cl_avail_grant -= oa->o_grant;
892 client_obd_list_unlock(&cli->cl_loi_list_lock);
893 oa->o_flags |= OBD_FL_SHRINK_GRANT;
894 osc_update_next_shrink(cli);
897 /* Shrink the current grant, either from some large amount to enough for a
898 * full set of in-flight RPCs, or if we have already shrunk to that limit
899 * then to enough for a single RPC. This avoids keeping more grant than
900 * needed, and avoids shrinking the grant piecemeal. */
901 static int osc_shrink_grant(struct client_obd *cli)
903 long target = (cli->cl_max_rpcs_in_flight + 1) *
904 cli->cl_max_pages_per_rpc;
906 client_obd_list_lock(&cli->cl_loi_list_lock);
907 if (cli->cl_avail_grant <= target)
908 target = cli->cl_max_pages_per_rpc;
909 client_obd_list_unlock(&cli->cl_loi_list_lock);
911 return osc_shrink_grant_to_target(cli, target);
914 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
917 struct ost_body *body;
920 client_obd_list_lock(&cli->cl_loi_list_lock);
921 /* Don't shrink if we are already above or below the desired limit
922 * We don't want to shrink below a single RPC, as that will negatively
923 * impact block allocation and long-term performance. */
924 if (target < cli->cl_max_pages_per_rpc)
925 target = cli->cl_max_pages_per_rpc;
927 if (target >= cli->cl_avail_grant) {
928 client_obd_list_unlock(&cli->cl_loi_list_lock);
931 client_obd_list_unlock(&cli->cl_loi_list_lock);
937 osc_announce_cached(cli, &body->oa, 0);
939 client_obd_list_lock(&cli->cl_loi_list_lock);
940 body->oa.o_grant = cli->cl_avail_grant - target;
941 cli->cl_avail_grant = target;
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
943 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
944 osc_update_next_shrink(cli);
946 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
947 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
948 sizeof(*body), body, NULL);
950 client_obd_list_lock(&cli->cl_loi_list_lock);
951 cli->cl_avail_grant += body->oa.o_grant;
952 client_obd_list_unlock(&cli->cl_loi_list_lock);
958 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
959 static int osc_should_shrink_grant(struct client_obd *client)
961 cfs_time_t time = cfs_time_current();
962 cfs_time_t next_shrink = client->cl_next_shrink_grant;
963 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
964 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
965 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
968 osc_update_next_shrink(client);
973 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
975 struct client_obd *client;
977 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
978 if (osc_should_shrink_grant(client))
979 osc_shrink_grant(client);
984 static int osc_add_shrink_grant(struct client_obd *client)
988 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
990 osc_grant_shrink_grant_cb, NULL,
991 &client->cl_grant_shrink_list);
993 CERROR("add grant client %s error %d\n",
994 client->cl_import->imp_obd->obd_name, rc);
997 CDEBUG(D_CACHE, "add grant client %s \n",
998 client->cl_import->imp_obd->obd_name);
999 osc_update_next_shrink(client);
1003 static int osc_del_shrink_grant(struct client_obd *client)
1005 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1009 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1011 client_obd_list_lock(&cli->cl_loi_list_lock);
1012 cli->cl_avail_grant = ocd->ocd_grant;
1013 client_obd_list_unlock(&cli->cl_loi_list_lock);
1015 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1016 list_empty(&cli->cl_grant_shrink_list))
1017 osc_add_shrink_grant(cli);
1019 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1020 cli->cl_avail_grant, cli->cl_lost_grant);
1021 LASSERT(cli->cl_avail_grant >= 0);
1024 /* We assume that the reason this OSC got a short read is because it read
1025 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1026 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1027 * this stripe never got written at or beyond this stripe offset yet. */
1028 static void handle_short_read(int nob_read, obd_count page_count,
1029 struct brw_page **pga, int pshift)
1034 /* skip bytes read OK */
1035 while (nob_read > 0) {
1036 LASSERT (page_count > 0);
1038 if (pga[i]->count > nob_read) {
1039 /* EOF inside this page */
1040 ptr = cfs_kmap(pga[i]->pg) +
1041 (OSC_FILE2MEM_OFF(pga[i]->off,pshift)&~CFS_PAGE_MASK);
1042 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1043 cfs_kunmap(pga[i]->pg);
1049 nob_read -= pga[i]->count;
1054 /* zero remaining pages */
1055 while (page_count-- > 0) {
1056 ptr = cfs_kmap(pga[i]->pg) +
1057 (OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK);
1058 memset(ptr, 0, pga[i]->count);
1059 cfs_kunmap(pga[i]->pg);
1064 static int check_write_rcs(struct ptlrpc_request *req,
1065 int requested_nob, int niocount,
1066 obd_count page_count, struct brw_page **pga)
1070 /* return error if any niobuf was in error */
1071 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1072 sizeof(*remote_rcs) * niocount, NULL);
1073 if (remote_rcs == NULL) {
1074 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1077 if (lustre_rep_need_swab(req))
1078 for (i = 0; i < niocount; i++)
1079 __swab32s(&remote_rcs[i]);
1081 for (i = 0; i < niocount; i++) {
1082 if (remote_rcs[i] < 0)
1083 return(remote_rcs[i]);
1085 if (remote_rcs[i] != 0) {
1086 CERROR("rc[%d] invalid (%d) req %p\n",
1087 i, remote_rcs[i], req);
1092 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1093 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1094 req->rq_bulk->bd_nob_transferred, requested_nob);
1101 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1103 if (p1->flag != p2->flag) {
1104 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1106 /* warn if we try to combine flags that we don't know to be
1107 * safe to combine */
1108 if ((p1->flag & mask) != (p2->flag & mask))
1109 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1110 "same brw?\n", p1->flag, p2->flag);
1114 return (p1->off + p1->count == p2->off);
1117 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1118 struct brw_page **pga, int opc,
1119 cksum_type_t cksum_type, int pshift)
1124 LASSERT (pg_count > 0);
1125 cksum = init_checksum(cksum_type);
1126 while (nob > 0 && pg_count > 0) {
1127 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1128 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1129 int count = pga[i]->count > nob ? nob : pga[i]->count;
1131 /* corrupt the data before we compute the checksum, to
1132 * simulate an OST->client data error */
1133 if (i == 0 && opc == OST_READ &&
1134 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1135 memcpy(ptr + off, "bad1", min(4, nob));
1136 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1137 cfs_kunmap(pga[i]->pg);
1138 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1141 nob -= pga[i]->count;
1145 /* For sending we only compute the wrong checksum instead
1146 * of corrupting the data so it is still correct on a redo */
1147 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1153 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1154 struct lov_stripe_md *lsm, obd_count page_count,
1155 struct brw_page **pga,
1156 struct ptlrpc_request **reqp, int pshift)
1158 struct ptlrpc_request *req;
1159 struct ptlrpc_bulk_desc *desc;
1160 struct ost_body *body;
1161 struct obd_ioobj *ioobj;
1162 struct niobuf_remote *niobuf;
1163 __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1164 int niocount, i, requested_nob, opc, rc;
1165 struct ptlrpc_request_pool *pool;
1166 struct osc_brw_async_args *aa;
1167 struct brw_page *pg_prev;
1170 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1171 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1173 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1174 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1176 for (niocount = i = 1; i < page_count; i++) {
1177 if (!can_merge_pages(pga[i - 1], pga[i]))
1181 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1182 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1184 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1189 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1190 ptlrpc_at_set_req_timeout(req);
1192 if (opc == OST_WRITE)
1193 desc = ptlrpc_prep_bulk_imp (req, page_count,
1194 BULK_GET_SOURCE, OST_BULK_PORTAL);
1196 desc = ptlrpc_prep_bulk_imp (req, page_count,
1197 BULK_PUT_SINK, OST_BULK_PORTAL);
1199 GOTO(out, rc = -ENOMEM);
1200 /* NB request now owns desc and will free it when it gets freed */
1202 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1203 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1204 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1205 niocount * sizeof(*niobuf));
1207 lustre_set_wire_obdo(&body->oa, oa);
1208 obdo_to_ioobj(oa, ioobj);
1209 ioobj->ioo_bufcnt = niocount;
1211 LASSERT (page_count > 0);
1213 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1214 struct brw_page *pg = pga[i];
1216 LASSERT(pg->count > 0);
1217 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1218 pg->count <= CFS_PAGE_SIZE,
1219 "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1220 i, pg, pg->off, pg->count, pshift);
1222 LASSERTF(i == 0 || pg->off > pg_prev->off,
1223 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1224 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1226 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1227 pg_prev->pg, page_private(pg_prev->pg),
1228 pg_prev->pg->index, pg_prev->off);
1230 LASSERTF(i == 0 || pg->off > pg_prev->off,
1231 "i %d p_c %u\n", i, page_count);
1233 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1234 (pg->flag & OBD_BRW_SRVLOCK));
1236 ptlrpc_prep_bulk_page(desc, pg->pg,
1237 OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1239 requested_nob += pg->count;
1241 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1243 niobuf->len += pg->count;
1245 niobuf->offset = pg->off;
1246 niobuf->len = pg->count;
1247 niobuf->flags = pg->flag;
1252 LASSERTF((void *)(niobuf - niocount) ==
1253 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1254 niocount * sizeof(*niobuf)),
1255 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1256 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1257 (void *)(niobuf - niocount));
1259 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1260 if (osc_should_shrink_grant(cli))
1261 osc_shrink_grant_local(cli, &body->oa);
1263 /* size[REQ_REC_OFF] still sizeof (*body) */
1264 if (opc == OST_WRITE) {
1265 if (cli->cl_checksum) {
1266 /* store cl_cksum_type in a local variable since
1267 * it can be changed via lprocfs */
1268 cksum_type_t cksum_type = cli->cl_cksum_type;
1270 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1271 oa->o_flags &= OBD_FL_LOCAL_MASK;
1272 body->oa.o_flags = 0;
1274 body->oa.o_flags |= cksum_type_pack(cksum_type);
1275 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1279 cksum_type, pshift);
1280 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1282 /* save this in 'oa', too, for later checking */
1283 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1284 oa->o_flags |= cksum_type_pack(cksum_type);
1286 /* clear out the checksum flag, in case this is a
1287 * resend but cl_checksum is no longer set. b=11238 */
1288 oa->o_valid &= ~OBD_MD_FLCKSUM;
1290 oa->o_cksum = body->oa.o_cksum;
1291 /* 1 RC per niobuf */
1292 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1293 ptlrpc_req_set_repsize(req, 3, size);
1295 if (cli->cl_checksum) {
1296 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1297 body->oa.o_flags = 0;
1298 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1299 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1301 /* 1 RC for the whole I/O */
1302 ptlrpc_req_set_repsize(req, 2, size);
1305 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1306 aa = ptlrpc_req_async_args(req);
1308 aa->aa_requested_nob = requested_nob;
1309 aa->aa_nio_count = niocount;
1310 aa->aa_page_count = page_count;
1314 aa->aa_pshift = pshift;
1315 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1321 ptlrpc_req_finished (req);
1325 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1326 __u32 client_cksum, __u32 server_cksum, int nob,
1327 obd_count page_count, struct brw_page **pga,
1328 cksum_type_t client_cksum_type, int pshift)
1332 cksum_type_t cksum_type;
1334 if (server_cksum == client_cksum) {
1335 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1339 if (oa->o_valid & OBD_MD_FLFLAGS)
1340 cksum_type = cksum_type_unpack(oa->o_flags);
1342 cksum_type = OBD_CKSUM_CRC32;
1344 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1345 cksum_type, pshift);
1347 if (cksum_type != client_cksum_type)
1348 msg = "the server did not use the checksum type specified in "
1349 "the original request - likely a protocol problem";
1350 else if (new_cksum == server_cksum)
1351 msg = "changed on the client after we checksummed it - "
1352 "likely false positive due to mmap IO (bug 11742)";
1353 else if (new_cksum == client_cksum)
1354 msg = "changed in transit before arrival at OST";
1356 msg = "changed in transit AND doesn't match the original - "
1357 "likely false positive due to mmap IO (bug 11742)";
1359 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1360 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1361 "["LPU64"-"LPU64"]\n",
1362 msg, libcfs_nid2str(peer->nid),
1363 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1364 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1367 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1369 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1370 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1371 "client csum now %x\n", client_cksum, client_cksum_type,
1372 server_cksum, cksum_type, new_cksum);
1377 /* Note rc enters this function as number of bytes transferred */
1378 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1380 struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1381 const lnet_process_id_t *peer =
1382 &req->rq_import->imp_connection->c_peer;
1383 struct client_obd *cli = aa->aa_cli;
1384 struct ost_body *body;
1385 __u32 client_cksum = 0;
1388 if (rc < 0 && rc != -EDQUOT)
1391 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1392 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1393 lustre_swab_ost_body);
1395 CERROR ("Can't unpack body\n");
1399 /* set/clear over quota flag for a uid/gid */
1400 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1401 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1402 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1403 body->oa.o_gid, body->oa.o_valid,
1409 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1410 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1412 osc_update_grant(cli, body);
1414 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1416 CERROR ("Unexpected +ve rc %d\n", rc);
1419 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1421 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1422 check_write_checksum(&body->oa, peer, client_cksum,
1423 body->oa.o_cksum, aa->aa_requested_nob,
1424 aa->aa_page_count, aa->aa_ppga,
1425 cksum_type_unpack(aa->aa_oa->o_flags),
1429 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1430 aa->aa_page_count, aa->aa_ppga);
1434 /* The rest of this function executes only for OST_READs */
1435 if (rc > aa->aa_requested_nob) {
1436 CERROR("Unexpected rc %d (%d requested)\n", rc,
1437 aa->aa_requested_nob);
1441 if (rc != req->rq_bulk->bd_nob_transferred) {
1442 CERROR ("Unexpected rc %d (%d transferred)\n",
1443 rc, req->rq_bulk->bd_nob_transferred);
1447 if (rc < aa->aa_requested_nob)
1448 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga, aa->aa_pshift);
1450 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1451 static int cksum_counter;
1452 __u32 server_cksum = body->oa.o_cksum;
1455 cksum_type_t cksum_type;
1457 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1458 cksum_type = cksum_type_unpack(body->oa.o_flags);
1460 cksum_type = OBD_CKSUM_CRC32;
1461 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1462 aa->aa_ppga, OST_READ,
1463 cksum_type, aa->aa_pshift);
1465 if (peer->nid == req->rq_bulk->bd_sender) {
1469 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1472 if (server_cksum == ~0 && rc > 0) {
1473 CERROR("Protocol error: server %s set the 'checksum' "
1474 "bit, but didn't send a checksum. Not fatal, "
1475 "but please notify on http://bugzilla.lustre.org/\n",
1476 libcfs_nid2str(peer->nid));
1477 } else if (server_cksum != client_cksum) {
1478 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1479 "%s%s%s inum "LPU64"/"LPU64" object "
1480 LPU64"/"LPU64" extent "
1481 "["LPU64"-"LPU64"]\n",
1482 req->rq_import->imp_obd->obd_name,
1483 libcfs_nid2str(peer->nid),
1485 body->oa.o_valid & OBD_MD_FLFID ?
1486 body->oa.o_fid : (__u64)0,
1487 body->oa.o_valid & OBD_MD_FLFID ?
1488 body->oa.o_generation :(__u64)0,
1490 body->oa.o_valid & OBD_MD_FLGROUP ?
1491 body->oa.o_gr : (__u64)0,
1492 aa->aa_ppga[0]->off,
1493 aa->aa_ppga[aa->aa_page_count-1]->off +
1494 aa->aa_ppga[aa->aa_page_count-1]->count -
1496 CERROR("client %x, server %x, cksum_type %x\n",
1497 client_cksum, server_cksum, cksum_type);
1499 aa->aa_oa->o_cksum = client_cksum;
1503 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1506 } else if (unlikely(client_cksum)) {
1507 static int cksum_missed;
1510 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1511 CERROR("Checksum %u requested from %s but not sent\n",
1512 cksum_missed, libcfs_nid2str(peer->nid));
1518 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1523 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1524 struct lov_stripe_md *lsm,
1525 obd_count page_count, struct brw_page **pga)
1527 struct ptlrpc_request *request;
1531 struct l_wait_info lwi;
1534 init_waitqueue_head(&waitq);
1537 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1538 page_count, pga, &request, 0);
1542 rc = ptlrpc_queue_wait(request);
1544 if (rc == -ETIMEDOUT && request->rq_resend) {
1545 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1546 ptlrpc_req_finished(request);
1550 rc = osc_brw_fini_request(request, rc);
1552 ptlrpc_req_finished(request);
1553 if (osc_recoverable_error(rc)) {
1555 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1556 CERROR("too many resend retries, returning error\n");
1560 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1561 l_wait_event(waitq, 0, &lwi);
1568 int osc_brw_redo_request(struct ptlrpc_request *request,
1569 struct osc_brw_async_args *aa)
1571 struct ptlrpc_request *new_req;
1572 struct ptlrpc_request_set *set = request->rq_set;
1573 struct osc_brw_async_args *new_aa;
1574 struct osc_async_page *oap;
1578 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1579 CERROR("too many resend retries, returning error\n");
1583 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1585 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1586 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1587 aa->aa_cli, aa->aa_oa,
1588 NULL /* lsm unused by osc currently */,
1589 aa->aa_page_count, aa->aa_ppga, &new_req,
1594 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1596 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1597 if (oap->oap_request != NULL) {
1598 LASSERTF(request == oap->oap_request,
1599 "request %p != oap_request %p\n",
1600 request, oap->oap_request);
1601 if (oap->oap_interrupted) {
1602 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1603 ptlrpc_req_finished(new_req);
1608 /* New request takes over pga and oaps from old request.
1609 * Note that copying a list_head doesn't work, need to move it... */
1611 new_req->rq_interpret_reply = request->rq_interpret_reply;
1612 new_req->rq_async_args = request->rq_async_args;
1613 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1615 new_aa = ptlrpc_req_async_args(new_req);
1617 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1618 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1619 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1621 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1622 if (oap->oap_request) {
1623 ptlrpc_req_finished(oap->oap_request);
1624 oap->oap_request = ptlrpc_request_addref(new_req);
1628 /* use ptlrpc_set_add_req is safe because interpret functions work
1629 * in check_set context. only one way exist with access to request
1630 * from different thread got -EINTR - this way protected with
1631 * cl_loi_list_lock */
1632 ptlrpc_set_add_req(set, new_req);
1634 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1636 DEBUG_REQ(D_INFO, new_req, "new request");
1640 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1641 struct lov_stripe_md *lsm, obd_count page_count,
1642 struct brw_page **pga, struct ptlrpc_request_set *set,
1645 struct ptlrpc_request *request;
1646 struct client_obd *cli = &exp->exp_obd->u.cli;
1648 struct osc_brw_async_args *aa;
1651 /* Consume write credits even if doing a sync write -
1652 * otherwise we may run out of space on OST due to grant. */
1653 /* FIXME: unaligned writes must use write grants too */
1654 if (cmd == OBD_BRW_WRITE && pshift == 0) {
1655 client_obd_list_lock(&cli->cl_loi_list_lock);
1656 for (i = 0; i < page_count; i++) {
1657 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1658 osc_consume_write_grant(cli, pga[i]);
1660 client_obd_list_unlock(&cli->cl_loi_list_lock);
1663 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1664 page_count, pga, &request, pshift);
1666 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1669 aa = ptlrpc_req_async_args(request);
1670 if (cmd == OBD_BRW_READ) {
1671 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1672 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1674 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1675 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1676 cli->cl_w_in_flight);
1678 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1680 LASSERT(list_empty(&aa->aa_oaps));
1682 request->rq_interpret_reply = brw_interpret;
1683 ptlrpc_set_add_req(set, request);
1684 client_obd_list_lock(&cli->cl_loi_list_lock);
1685 if (cmd == OBD_BRW_READ)
1686 cli->cl_r_in_flight++;
1688 cli->cl_w_in_flight++;
1689 client_obd_list_unlock(&cli->cl_loi_list_lock);
1690 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1691 } else if (cmd == OBD_BRW_WRITE) {
1692 client_obd_list_lock(&cli->cl_loi_list_lock);
1693 for (i = 0; i < page_count; i++)
1694 osc_release_write_grant(cli, pga[i], 0);
1695 osc_wake_cache_waiters(cli);
1696 client_obd_list_unlock(&cli->cl_loi_list_lock);
1703 * ugh, we want disk allocation on the target to happen in offset order. we'll
1704 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1705 * fine for our small page arrays and doesn't require allocation. its an
1706 * insertion sort that swaps elements that are strides apart, shrinking the
1707 * stride down until its '1' and the array is sorted.
1709 static void sort_brw_pages(struct brw_page **array, int num)
1712 struct brw_page *tmp;
1716 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1721 for (i = stride ; i < num ; i++) {
1724 while (j >= stride && array[j-stride]->off > tmp->off) {
1725 array[j] = array[j - stride];
1730 } while (stride > 1);
1733 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1740 LASSERT (pages > 0);
1741 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1745 if (pages == 0) /* that's all */
1748 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1749 return count; /* doesn't end on page boundary */
1752 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1753 if (offset != 0) /* doesn't start on page boundary */
1760 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1762 struct brw_page **ppga;
1765 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1769 for (i = 0; i < count; i++)
1774 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1776 LASSERT(ppga != NULL);
1777 OBD_FREE(ppga, sizeof(*ppga) * count);
1780 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1781 obd_count page_count, struct brw_page *pga,
1782 struct obd_trans_info *oti)
1784 struct obdo *saved_oa = NULL;
1785 struct brw_page **ppga, **orig;
1786 struct obd_import *imp = class_exp2cliimp(exp);
1787 struct client_obd *cli;
1788 int rc, page_count_orig;
1791 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1792 cli = &imp->imp_obd->u.cli;
1794 if (cmd & OBD_BRW_CHECK) {
1795 /* The caller just wants to know if there's a chance that this
1796 * I/O can succeed */
1798 if (imp->imp_invalid)
1803 /* test_brw with a failed create can trip this, maybe others. */
1804 LASSERT(cli->cl_max_pages_per_rpc);
1808 orig = ppga = osc_build_ppga(pga, page_count);
1811 page_count_orig = page_count;
1813 sort_brw_pages(ppga, page_count);
1814 while (page_count) {
1815 obd_count pages_per_brw;
1817 if (page_count > cli->cl_max_pages_per_rpc)
1818 pages_per_brw = cli->cl_max_pages_per_rpc;
1820 pages_per_brw = page_count;
1822 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1824 if (saved_oa != NULL) {
1825 /* restore previously saved oa */
1826 *oinfo->oi_oa = *saved_oa;
1827 } else if (page_count > pages_per_brw) {
1828 /* save a copy of oa (brw will clobber it) */
1829 OBDO_ALLOC(saved_oa);
1830 if (saved_oa == NULL)
1831 GOTO(out, rc = -ENOMEM);
1832 *saved_oa = *oinfo->oi_oa;
1835 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1836 pages_per_brw, ppga);
1841 page_count -= pages_per_brw;
1842 ppga += pages_per_brw;
1846 osc_release_ppga(orig, page_count_orig);
1848 if (saved_oa != NULL)
1849 OBDO_FREE(saved_oa);
1854 static int osc_brw_async(int cmd, struct obd_export *exp,
1855 struct obd_info *oinfo, obd_count page_count,
1856 struct brw_page *pga, struct obd_trans_info *oti,
1857 struct ptlrpc_request_set *set, int pshift)
1859 struct brw_page **ppga, **orig;
1860 int page_count_orig;
1864 if (cmd & OBD_BRW_CHECK) {
1865 /* The caller just wants to know if there's a chance that this
1866 * I/O can succeed */
1867 struct obd_import *imp = class_exp2cliimp(exp);
1869 if (imp == NULL || imp->imp_invalid)
1874 orig = ppga = osc_build_ppga(pga, page_count);
1877 page_count_orig = page_count;
1879 sort_brw_pages(ppga, page_count);
1880 while (page_count) {
1881 struct brw_page **copy;
1883 obd_count pages_per_brw;
1885 /* one page less under unaligned direct i/o */
1886 pages_per_brw = min_t(obd_count, page_count,
1887 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1890 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1893 /* use ppga only if single RPC is going to fly */
1894 if (pages_per_brw != page_count_orig || ppga != orig) {
1895 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1897 GOTO(out, rc = -ENOMEM);
1898 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1902 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1903 GOTO(out, rc = -ENOMEM);
1905 memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1906 oa->o_flags |= OBD_FL_TEMPORARY;
1910 LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1913 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1918 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1920 if (oa->o_flags & OBD_FL_TEMPORARY)
1926 /* we passed it to async_internal() which is
1927 * now responsible for releasing memory */
1931 page_count -= pages_per_brw;
1932 ppga += pages_per_brw;
1936 osc_release_ppga(orig, page_count_orig);
1940 static void osc_check_rpcs(struct client_obd *cli);
1942 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1943 * the dirty accounting. Writeback completes or truncate happens before
1944 * writing starts. Must be called with the loi lock held. */
1945 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1948 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1951 /* This maintains the lists of pending pages to read/write for a given object
1952 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1953 * to quickly find objects that are ready to send an RPC. */
1954 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1960 if (lop->lop_num_pending == 0)
1963 /* if we have an invalid import we want to drain the queued pages
1964 * by forcing them through rpcs that immediately fail and complete
1965 * the pages. recovery relies on this to empty the queued pages
1966 * before canceling the locks and evicting down the llite pages */
1967 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1970 /* stream rpcs in queue order as long as as there is an urgent page
1971 * queued. this is our cheap solution for good batching in the case
1972 * where writepage marks some random page in the middle of the file
1973 * as urgent because of, say, memory pressure */
1974 if (!list_empty(&lop->lop_urgent)) {
1975 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1979 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1980 optimal = cli->cl_max_pages_per_rpc;
1981 if (cmd & OBD_BRW_WRITE) {
1982 /* trigger a write rpc stream as long as there are dirtiers
1983 * waiting for space. as they're waiting, they're not going to
1984 * create more pages to coallesce with what's waiting.. */
1985 if (!list_empty(&cli->cl_cache_waiters)) {
1986 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1990 /* +16 to avoid triggering rpcs that would want to include pages
1991 * that are being queued but which can't be made ready until
1992 * the queuer finishes with the page. this is a wart for
1993 * llite::commit_write() */
1996 if (lop->lop_num_pending >= optimal)
2002 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2004 struct osc_async_page *oap;
2007 if (list_empty(&lop->lop_urgent))
2010 oap = list_entry(lop->lop_urgent.next,
2011 struct osc_async_page, oap_urgent_item);
2013 if (oap->oap_async_flags & ASYNC_HP) {
2014 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2021 static void on_list(struct list_head *item, struct list_head *list,
2024 if (list_empty(item) && should_be_on)
2025 list_add_tail(item, list);
2026 else if (!list_empty(item) && !should_be_on)
2027 list_del_init(item);
2030 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2031 * can find pages to build into rpcs quickly */
2032 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2034 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2035 lop_makes_hprpc(&loi->loi_read_lop)) {
2037 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2038 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2040 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2041 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2042 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2043 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2046 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2047 loi->loi_write_lop.lop_num_pending);
2049 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2050 loi->loi_read_lop.lop_num_pending);
2053 static void lop_update_pending(struct client_obd *cli,
2054 struct loi_oap_pages *lop, int cmd, int delta)
2056 lop->lop_num_pending += delta;
2057 if (cmd & OBD_BRW_WRITE)
2058 cli->cl_pending_w_pages += delta;
2060 cli->cl_pending_r_pages += delta;
2063 /* this is called when a sync waiter receives an interruption. Its job is to
2064 * get the caller woken as soon as possible. If its page hasn't been put in an
2065 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2066 * desiring interruption which will forcefully complete the rpc once the rpc
2068 static void osc_occ_interrupted(struct oig_callback_context *occ)
2070 struct osc_async_page *oap;
2071 struct loi_oap_pages *lop;
2072 struct lov_oinfo *loi;
2075 /* XXX member_of() */
2076 oap = list_entry(occ, struct osc_async_page, oap_occ);
2078 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2080 oap->oap_interrupted = 1;
2082 /* ok, it's been put in an rpc. only one oap gets a request reference */
2083 if (oap->oap_request != NULL) {
2084 ptlrpc_mark_interrupted(oap->oap_request);
2085 ptlrpcd_wake(oap->oap_request);
2089 /* we don't get interruption callbacks until osc_trigger_group_io()
2090 * has been called and put the sync oaps in the pending/urgent lists.*/
2091 if (!list_empty(&oap->oap_pending_item)) {
2092 list_del_init(&oap->oap_pending_item);
2093 list_del_init(&oap->oap_urgent_item);
2096 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2097 &loi->loi_write_lop : &loi->loi_read_lop;
2098 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2099 loi_list_maint(oap->oap_cli, oap->oap_loi);
2101 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2102 oap->oap_oig = NULL;
2106 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2109 /* this is trying to propogate async writeback errors back up to the
2110 * application. As an async write fails we record the error code for later if
2111 * the app does an fsync. As long as errors persist we force future rpcs to be
2112 * sync so that the app can get a sync error and break the cycle of queueing
2113 * pages for which writeback will fail. */
2114 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2121 ar->ar_force_sync = 1;
2122 ar->ar_min_xid = ptlrpc_sample_next_xid();
2127 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2128 ar->ar_force_sync = 0;
2131 static void osc_oap_to_pending(struct osc_async_page *oap)
2133 struct loi_oap_pages *lop;
2135 if (oap->oap_cmd & OBD_BRW_WRITE)
2136 lop = &oap->oap_loi->loi_write_lop;
2138 lop = &oap->oap_loi->loi_read_lop;
2140 if (oap->oap_async_flags & ASYNC_HP)
2141 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2142 else if (oap->oap_async_flags & ASYNC_URGENT)
2143 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2144 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2145 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2148 /* this must be called holding the loi list lock to give coverage to exit_cache,
2149 * async_flag maintenance, and oap_request */
2150 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2151 struct osc_async_page *oap, int sent, int rc)
2156 if (oap->oap_request != NULL) {
2157 xid = ptlrpc_req_xid(oap->oap_request);
2158 ptlrpc_req_finished(oap->oap_request);
2159 oap->oap_request = NULL;
2162 spin_lock(&oap->oap_lock);
2163 oap->oap_async_flags = 0;
2164 spin_unlock(&oap->oap_lock);
2165 oap->oap_interrupted = 0;
2167 if (oap->oap_cmd & OBD_BRW_WRITE) {
2168 osc_process_ar(&cli->cl_ar, xid, rc);
2169 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2172 if (rc == 0 && oa != NULL) {
2173 if (oa->o_valid & OBD_MD_FLBLOCKS)
2174 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2175 if (oa->o_valid & OBD_MD_FLMTIME)
2176 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2177 if (oa->o_valid & OBD_MD_FLATIME)
2178 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2179 if (oa->o_valid & OBD_MD_FLCTIME)
2180 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2184 osc_exit_cache(cli, oap, sent);
2185 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2186 oap->oap_oig = NULL;
2191 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2192 oap->oap_cmd, oa, rc);
2194 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2195 * I/O on the page could start, but OSC calls it under lock
2196 * and thus we can add oap back to pending safely */
2198 /* upper layer wants to leave the page on pending queue */
2199 osc_oap_to_pending(oap);
2201 osc_exit_cache(cli, oap, sent);
2205 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2207 struct osc_brw_async_args *aa = data;
2208 struct client_obd *cli;
2211 rc = osc_brw_fini_request(request, rc);
2212 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2214 if (osc_recoverable_error(rc)) {
2215 rc = osc_brw_redo_request(request, aa);
2221 client_obd_list_lock(&cli->cl_loi_list_lock);
2222 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2223 * is called so we know whether to go to sync BRWs or wait for more
2224 * RPCs to complete */
2225 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2226 cli->cl_w_in_flight--;
2228 cli->cl_r_in_flight--;
2230 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2231 struct osc_async_page *oap, *tmp;
2232 /* the caller may re-use the oap after the completion call so
2233 * we need to clean it up a little */
2234 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2235 list_del_init(&oap->oap_rpc_item);
2236 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2238 OBDO_FREE(aa->aa_oa);
2239 } else { /* from async_internal() */
2241 for (i = 0; i < aa->aa_page_count; i++)
2242 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2244 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2245 OBDO_FREE(aa->aa_oa);
2247 osc_wake_cache_waiters(cli);
2248 osc_check_rpcs(cli);
2249 client_obd_list_unlock(&cli->cl_loi_list_lock);
2251 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2256 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2257 struct list_head *rpc_list,
2258 int page_count, int cmd)
2260 struct ptlrpc_request *req;
2261 struct brw_page **pga = NULL;
2262 struct osc_brw_async_args *aa;
2263 struct obdo *oa = NULL;
2264 struct obd_async_page_ops *ops = NULL;
2265 void *caller_data = NULL;
2266 struct osc_async_page *oap;
2267 struct ldlm_lock *lock = NULL;
2272 LASSERT(!list_empty(rpc_list));
2274 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2276 RETURN(ERR_PTR(-ENOMEM));
2280 GOTO(out, req = ERR_PTR(-ENOMEM));
2283 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2285 ops = oap->oap_caller_ops;
2286 caller_data = oap->oap_caller_data;
2287 lock = oap->oap_ldlm_lock;
2289 pga[i] = &oap->oap_brw_page;
2290 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2291 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2292 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2296 /* always get the data for the obdo for the rpc */
2297 LASSERT(ops != NULL);
2298 ops->ap_fill_obdo(caller_data, cmd, oa);
2300 oa->o_handle = lock->l_remote_handle;
2301 oa->o_valid |= OBD_MD_FLHANDLE;
2304 sort_brw_pages(pga, page_count);
2305 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0);
2307 CERROR("prep_req failed: %d\n", rc);
2308 GOTO(out, req = ERR_PTR(rc));
2310 oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2311 sizeof(struct ost_body)))->oa;
2313 /* Need to update the timestamps after the request is built in case
2314 * we race with setattr (locally or in queue at OST). If OST gets
2315 * later setattr before earlier BRW (as determined by the request xid),
2316 * the OST will not use BRW timestamps. Sadly, there is no obvious
2317 * way to do this in a single call. bug 10150 */
2318 if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2319 /* in case of lockless read/write do not use inode's
2320 * timestamps because concurrent stat might fill the
2321 * inode with out-of-date times, send current
2323 if (cmd & OBD_BRW_WRITE) {
2324 oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2325 oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2326 valid = OBD_MD_FLATIME;
2328 oa->o_atime = LTIME_S(CURRENT_TIME);
2329 oa->o_valid |= OBD_MD_FLATIME;
2330 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2333 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2335 ops->ap_update_obdo(caller_data, cmd, oa, valid);
2337 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2338 aa = ptlrpc_req_async_args(req);
2339 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2340 list_splice(rpc_list, &aa->aa_oaps);
2341 CFS_INIT_LIST_HEAD(rpc_list);
2348 OBD_FREE(pga, sizeof(*pga) * page_count);
2353 /* the loi lock is held across this function but it's allowed to release
2354 * and reacquire it during its work */
2356 * prepare pages for ASYNC io and put pages in send queue.
2360 * \param cmd - OBD_BRW_* macroses
2361 * \param lop - pending pages
2363 * \return zero if pages successfully add to send queue.
2364 * \return not zere if error occurring.
2366 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2367 int cmd, struct loi_oap_pages *lop)
2369 struct ptlrpc_request *req;
2370 obd_count page_count = 0;
2371 struct osc_async_page *oap = NULL, *tmp;
2372 struct osc_brw_async_args *aa;
2373 struct obd_async_page_ops *ops;
2374 CFS_LIST_HEAD(rpc_list);
2375 unsigned int ending_offset;
2376 unsigned starting_offset = 0;
2380 /* If there are HP OAPs we need to handle at least 1 of them,
2381 * move it the beginning of the pending list for that. */
2382 if (!list_empty(&lop->lop_urgent)) {
2383 oap = list_entry(lop->lop_urgent.next,
2384 struct osc_async_page, oap_urgent_item);
2385 if (oap->oap_async_flags & ASYNC_HP)
2386 list_move(&oap->oap_pending_item, &lop->lop_pending);
2389 /* first we find the pages we're allowed to work with */
2390 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2391 ops = oap->oap_caller_ops;
2393 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2394 "magic 0x%x\n", oap, oap->oap_magic);
2396 if (page_count != 0 &&
2397 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2398 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2399 " oap %p, page %p, srvlock %u\n",
2400 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2403 /* in llite being 'ready' equates to the page being locked
2404 * until completion unlocks it. commit_write submits a page
2405 * as not ready because its unlock will happen unconditionally
2406 * as the call returns. if we race with commit_write giving
2407 * us that page we dont' want to create a hole in the page
2408 * stream, so we stop and leave the rpc to be fired by
2409 * another dirtier or kupdated interval (the not ready page
2410 * will still be on the dirty list). we could call in
2411 * at the end of ll_file_write to process the queue again. */
2412 if (!(oap->oap_async_flags & ASYNC_READY)) {
2413 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2415 CDEBUG(D_INODE, "oap %p page %p returned %d "
2416 "instead of ready\n", oap,
2420 /* llite is telling us that the page is still
2421 * in commit_write and that we should try
2422 * and put it in an rpc again later. we
2423 * break out of the loop so we don't create
2424 * a hole in the sequence of pages in the rpc
2429 /* the io isn't needed.. tell the checks
2430 * below to complete the rpc with EINTR */
2431 spin_lock(&oap->oap_lock);
2432 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2433 spin_unlock(&oap->oap_lock);
2434 oap->oap_count = -EINTR;
2437 spin_lock(&oap->oap_lock);
2438 oap->oap_async_flags |= ASYNC_READY;
2439 spin_unlock(&oap->oap_lock);
2442 LASSERTF(0, "oap %p page %p returned %d "
2443 "from make_ready\n", oap,
2451 * Page submitted for IO has to be locked. Either by
2452 * ->ap_make_ready() or by higher layers.
2454 #if defined(__KERNEL__) && defined(__linux__)
2455 if(!(PageLocked(oap->oap_page) &&
2456 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2457 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2458 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2462 /* If there is a gap at the start of this page, it can't merge
2463 * with any previous page, so we'll hand the network a
2464 * "fragmented" page array that it can't transfer in 1 RDMA */
2465 if (page_count != 0 && oap->oap_page_off != 0)
2468 /* take the page out of our book-keeping */
2469 list_del_init(&oap->oap_pending_item);
2470 lop_update_pending(cli, lop, cmd, -1);
2471 list_del_init(&oap->oap_urgent_item);
2473 if (page_count == 0)
2474 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2475 (PTLRPC_MAX_BRW_SIZE - 1);
2477 /* ask the caller for the size of the io as the rpc leaves. */
2478 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2480 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2481 if (oap->oap_count <= 0) {
2482 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2484 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2488 /* now put the page back in our accounting */
2489 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2490 if (page_count == 0)
2491 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2492 if (++page_count >= cli->cl_max_pages_per_rpc)
2495 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2496 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2497 * have the same alignment as the initial writes that allocated
2498 * extents on the server. */
2499 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2500 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2501 if (ending_offset == 0)
2504 /* If there is a gap at the end of this page, it can't merge
2505 * with any subsequent pages, so we'll hand the network a
2506 * "fragmented" page array that it can't transfer in 1 RDMA */
2507 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2511 osc_wake_cache_waiters(cli);
2513 if (page_count == 0)
2516 loi_list_maint(cli, loi);
2518 client_obd_list_unlock(&cli->cl_loi_list_lock);
2520 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2522 /* this should happen rarely and is pretty bad, it makes the
2523 * pending list not follow the dirty order */
2524 client_obd_list_lock(&cli->cl_loi_list_lock);
2525 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2526 list_del_init(&oap->oap_rpc_item);
2528 /* queued sync pages can be torn down while the pages
2529 * were between the pending list and the rpc */
2530 if (oap->oap_interrupted) {
2531 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2532 osc_ap_completion(cli, NULL, oap, 0,
2536 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2538 loi_list_maint(cli, loi);
2539 RETURN(PTR_ERR(req));
2542 aa = ptlrpc_req_async_args(req);
2543 if (cmd == OBD_BRW_READ) {
2544 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2545 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2546 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2547 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2549 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2550 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2551 cli->cl_w_in_flight);
2552 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2553 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2555 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2557 client_obd_list_lock(&cli->cl_loi_list_lock);
2559 if (cmd == OBD_BRW_READ)
2560 cli->cl_r_in_flight++;
2562 cli->cl_w_in_flight++;
2564 /* queued sync pages can be torn down while the pages
2565 * were between the pending list and the rpc */
2567 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2568 /* only one oap gets a request reference */
2571 if (oap->oap_interrupted && !req->rq_intr) {
2572 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2574 ptlrpc_mark_interrupted(req);
2578 tmp->oap_request = ptlrpc_request_addref(req);
2580 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2581 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2583 req->rq_interpret_reply = brw_interpret;
2584 ptlrpcd_add_req(req);
2588 #define LOI_DEBUG(LOI, STR, args...) \
2589 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2590 !list_empty(&(LOI)->loi_ready_item) || \
2591 !list_empty(&(LOI)->loi_hp_ready_item), \
2592 (LOI)->loi_write_lop.lop_num_pending, \
2593 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2594 (LOI)->loi_read_lop.lop_num_pending, \
2595 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2598 /* This is called by osc_check_rpcs() to find which objects have pages that
2599 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2600 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2603 /* First return objects that have blocked locks so that they
2604 * will be flushed quickly and other clients can get the lock,
2605 * then objects which have pages ready to be stuffed into RPCs */
2606 if (!list_empty(&cli->cl_loi_hp_ready_list))
2607 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2608 struct lov_oinfo, loi_hp_ready_item));
2609 if (!list_empty(&cli->cl_loi_ready_list))
2610 RETURN(list_entry(cli->cl_loi_ready_list.next,
2611 struct lov_oinfo, loi_ready_item));
2613 /* then if we have cache waiters, return all objects with queued
2614 * writes. This is especially important when many small files
2615 * have filled up the cache and not been fired into rpcs because
2616 * they don't pass the nr_pending/object threshhold */
2617 if (!list_empty(&cli->cl_cache_waiters) &&
2618 !list_empty(&cli->cl_loi_write_list))
2619 RETURN(list_entry(cli->cl_loi_write_list.next,
2620 struct lov_oinfo, loi_write_item));
2622 /* then return all queued objects when we have an invalid import
2623 * so that they get flushed */
2624 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2625 if (!list_empty(&cli->cl_loi_write_list))
2626 RETURN(list_entry(cli->cl_loi_write_list.next,
2627 struct lov_oinfo, loi_write_item));
2628 if (!list_empty(&cli->cl_loi_read_list))
2629 RETURN(list_entry(cli->cl_loi_read_list.next,
2630 struct lov_oinfo, loi_read_item));
2635 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2637 struct osc_async_page *oap;
2640 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2641 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2642 struct osc_async_page, oap_urgent_item);
2643 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2646 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2647 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2648 struct osc_async_page, oap_urgent_item);
2649 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2652 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2655 /* called with the loi list lock held */
2656 static void osc_check_rpcs(struct client_obd *cli)
2658 struct lov_oinfo *loi;
2659 int rc = 0, race_counter = 0;
2662 while ((loi = osc_next_loi(cli)) != NULL) {
2663 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2665 if (osc_max_rpc_in_flight(cli, loi))
2668 /* attempt some read/write balancing by alternating between
2669 * reads and writes in an object. The makes_rpc checks here
2670 * would be redundant if we were getting read/write work items
2671 * instead of objects. we don't want send_oap_rpc to drain a
2672 * partial read pending queue when we're given this object to
2673 * do io on writes while there are cache waiters */
2674 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2675 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2676 &loi->loi_write_lop);
2684 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2685 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2686 &loi->loi_read_lop);
2695 /* attempt some inter-object balancing by issueing rpcs
2696 * for each object in turn */
2697 if (!list_empty(&loi->loi_hp_ready_item))
2698 list_del_init(&loi->loi_hp_ready_item);
2699 if (!list_empty(&loi->loi_ready_item))
2700 list_del_init(&loi->loi_ready_item);
2701 if (!list_empty(&loi->loi_write_item))
2702 list_del_init(&loi->loi_write_item);
2703 if (!list_empty(&loi->loi_read_item))
2704 list_del_init(&loi->loi_read_item);
2706 loi_list_maint(cli, loi);
2708 /* send_oap_rpc fails with 0 when make_ready tells it to
2709 * back off. llite's make_ready does this when it tries
2710 * to lock a page queued for write that is already locked.
2711 * we want to try sending rpcs from many objects, but we
2712 * don't want to spin failing with 0. */
2713 if (race_counter == 10)
2719 /* we're trying to queue a page in the osc so we're subject to the
2720 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2721 * If the osc's queued pages are already at that limit, then we want to sleep
2722 * until there is space in the osc's queue for us. We also may be waiting for
2723 * write credits from the OST if there are RPCs in flight that may return some
2724 * before we fall back to sync writes.
2726 * We need this know our allocation was granted in the presence of signals */
2727 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2731 client_obd_list_lock(&cli->cl_loi_list_lock);
2732 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2733 client_obd_list_unlock(&cli->cl_loi_list_lock);
2737 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2738 * grant or cache space. */
2739 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2740 struct osc_async_page *oap)
2742 struct osc_cache_waiter ocw;
2743 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2746 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2747 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2748 cli->cl_dirty_max, obd_max_dirty_pages,
2749 cli->cl_lost_grant, cli->cl_avail_grant);
2751 /* force the caller to try sync io. this can jump the list
2752 * of queued writes and create a discontiguous rpc stream */
2753 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2754 loi->loi_ar.ar_force_sync)
2757 /* Hopefully normal case - cache space and write credits available */
2758 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2759 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2760 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2761 /* account for ourselves */
2762 osc_consume_write_grant(cli, &oap->oap_brw_page);
2766 /* It is safe to block as a cache waiter as long as there is grant
2767 * space available or the hope of additional grant being returned
2768 * when an in flight write completes. Using the write back cache
2769 * if possible is preferable to sending the data synchronously
2770 * because write pages can then be merged in to large requests.
2771 * The addition of this cache waiter will causing pending write
2772 * pages to be sent immediately. */
2773 if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2774 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2775 cfs_waitq_init(&ocw.ocw_waitq);
2779 loi_list_maint(cli, loi);
2780 osc_check_rpcs(cli);
2781 client_obd_list_unlock(&cli->cl_loi_list_lock);
2783 CDEBUG(D_CACHE, "sleeping for cache space\n");
2784 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2786 client_obd_list_lock(&cli->cl_loi_list_lock);
2787 if (!list_empty(&ocw.ocw_entry)) {
2788 list_del(&ocw.ocw_entry);
2797 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2798 void **res, int rw, obd_off start, obd_off end,
2799 struct lustre_handle *lockh, int flags)
2801 struct ldlm_lock *lock = NULL;
2802 int rc, release = 0;
2806 if (lockh && lustre_handle_is_used(lockh)) {
2807 /* if a valid lockh is passed, just check that the corresponding
2808 * lock covers the extent */
2809 lock = ldlm_handle2lock(lockh);
2812 struct osc_async_page *oap = *res;
2813 spin_lock(&oap->oap_lock);
2814 lock = oap->oap_ldlm_lock;
2816 LDLM_LOCK_GET(lock);
2817 spin_unlock(&oap->oap_lock);
2819 /* lock can be NULL in case race obd_get_lock vs lock cancel
2820 * so we should be don't try match this */
2821 if (unlikely(!lock))
2824 rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2825 if (release == 1 && rc == 1)
2826 /* if a valid lockh was passed, we just need to check
2827 * that the lock covers the page, no reference should be
2829 ldlm_lock_decref(lockh,
2830 rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2831 LDLM_LOCK_PUT(lock);
2835 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2836 struct lov_oinfo *loi, cfs_page_t *page,
2837 obd_off offset, struct obd_async_page_ops *ops,
2838 void *data, void **res, int flags,
2839 struct lustre_handle *lockh)
2841 struct osc_async_page *oap;
2842 struct ldlm_res_id oid = {{0}};
2848 return size_round(sizeof(*oap));
2851 oap->oap_magic = OAP_MAGIC;
2852 oap->oap_cli = &exp->exp_obd->u.cli;
2855 oap->oap_caller_ops = ops;
2856 oap->oap_caller_data = data;
2858 oap->oap_page = page;
2859 oap->oap_obj_off = offset;
2861 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2862 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2863 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2864 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2866 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2868 spin_lock_init(&oap->oap_lock);
2870 /* If the page was marked as notcacheable - don't add to any locks */
2871 if (!(flags & OBD_PAGE_NO_CACHE)) {
2872 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2873 /* This is the only place where we can call cache_add_extent
2874 without oap_lock, because this page is locked now, and
2875 the lock we are adding it to is referenced, so cannot lose
2876 any pages either. */
2877 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2882 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2886 struct osc_async_page *oap_from_cookie(void *cookie)
2888 struct osc_async_page *oap = cookie;
2889 if (oap->oap_magic != OAP_MAGIC)
2890 return ERR_PTR(-EINVAL);
2894 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2895 struct lov_oinfo *loi, void *cookie,
2896 int cmd, obd_off off, int count,
2897 obd_flag brw_flags, enum async_flags async_flags)
2899 struct client_obd *cli = &exp->exp_obd->u.cli;
2900 struct osc_async_page *oap;
2904 oap = oap_from_cookie(cookie);
2906 RETURN(PTR_ERR(oap));
2908 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2911 if (!list_empty(&oap->oap_pending_item) ||
2912 !list_empty(&oap->oap_urgent_item) ||
2913 !list_empty(&oap->oap_rpc_item))
2916 /* check if the file's owner/group is over quota */
2917 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2918 struct obd_async_page_ops *ops;
2925 ops = oap->oap_caller_ops;
2926 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2927 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2937 loi = lsm->lsm_oinfo[0];
2939 client_obd_list_lock(&cli->cl_loi_list_lock);
2942 oap->oap_page_off = off;
2943 oap->oap_count = count;
2944 oap->oap_brw_flags = brw_flags;
2945 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2946 if (libcfs_memory_pressure_get())
2947 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2948 spin_lock(&oap->oap_lock);
2949 oap->oap_async_flags = async_flags;
2950 spin_unlock(&oap->oap_lock);
2952 if (cmd & OBD_BRW_WRITE) {
2953 rc = osc_enter_cache(cli, loi, oap);
2955 client_obd_list_unlock(&cli->cl_loi_list_lock);
2960 osc_oap_to_pending(oap);
2961 loi_list_maint(cli, loi);
2963 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2966 osc_check_rpcs(cli);
2967 client_obd_list_unlock(&cli->cl_loi_list_lock);
2972 /* aka (~was & now & flag), but this is more clear :) */
2973 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2975 static int osc_set_async_flags(struct obd_export *exp,
2976 struct lov_stripe_md *lsm,
2977 struct lov_oinfo *loi, void *cookie,
2978 obd_flag async_flags)
2980 struct client_obd *cli = &exp->exp_obd->u.cli;
2981 struct loi_oap_pages *lop;
2982 struct osc_async_page *oap;
2986 oap = oap_from_cookie(cookie);
2988 RETURN(PTR_ERR(oap));
2991 * bug 7311: OST-side locking is only supported for liblustre for now
2992 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2993 * implementation has to handle case where OST-locked page was picked
2994 * up by, e.g., ->writepage().
2996 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2997 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
3000 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3004 loi = lsm->lsm_oinfo[0];
3006 if (oap->oap_cmd & OBD_BRW_WRITE) {
3007 lop = &loi->loi_write_lop;
3009 lop = &loi->loi_read_lop;
3012 client_obd_list_lock(&cli->cl_loi_list_lock);
3013 /* oap_lock provides atomic semantics of oap_async_flags access */
3014 spin_lock(&oap->oap_lock);
3015 if (list_empty(&oap->oap_pending_item))
3016 GOTO(out, rc = -EINVAL);
3018 if ((oap->oap_async_flags & async_flags) == async_flags)
3021 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3022 oap->oap_async_flags |= ASYNC_READY;
3024 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3025 list_empty(&oap->oap_rpc_item)) {
3026 if (oap->oap_async_flags & ASYNC_HP)
3027 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3029 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3030 oap->oap_async_flags |= ASYNC_URGENT;
3031 loi_list_maint(cli, loi);
3034 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3035 oap->oap_async_flags);
3037 spin_unlock(&oap->oap_lock);
3038 osc_check_rpcs(cli);
3039 client_obd_list_unlock(&cli->cl_loi_list_lock);
3043 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3044 struct lov_oinfo *loi,
3045 struct obd_io_group *oig, void *cookie,
3046 int cmd, obd_off off, int count,
3048 obd_flag async_flags)
3050 struct client_obd *cli = &exp->exp_obd->u.cli;
3051 struct osc_async_page *oap;
3052 struct loi_oap_pages *lop;
3056 oap = oap_from_cookie(cookie);
3058 RETURN(PTR_ERR(oap));
3060 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3063 if (!list_empty(&oap->oap_pending_item) ||
3064 !list_empty(&oap->oap_urgent_item) ||
3065 !list_empty(&oap->oap_rpc_item))
3069 loi = lsm->lsm_oinfo[0];
3071 client_obd_list_lock(&cli->cl_loi_list_lock);
3074 oap->oap_page_off = off;
3075 oap->oap_count = count;
3076 oap->oap_brw_flags = brw_flags;
3077 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3078 if (libcfs_memory_pressure_get())
3079 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3080 spin_lock(&oap->oap_lock);
3081 oap->oap_async_flags = async_flags;
3082 spin_unlock(&oap->oap_lock);
3084 if (cmd & OBD_BRW_WRITE)
3085 lop = &loi->loi_write_lop;
3087 lop = &loi->loi_read_lop;
3089 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3090 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3092 rc = oig_add_one(oig, &oap->oap_occ);
3095 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3096 oap, oap->oap_page, rc);
3098 client_obd_list_unlock(&cli->cl_loi_list_lock);
3103 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3104 struct loi_oap_pages *lop, int cmd)
3106 struct list_head *pos, *tmp;
3107 struct osc_async_page *oap;
3109 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3110 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3111 list_del(&oap->oap_pending_item);
3112 osc_oap_to_pending(oap);
3114 loi_list_maint(cli, loi);
3117 static int osc_trigger_group_io(struct obd_export *exp,
3118 struct lov_stripe_md *lsm,
3119 struct lov_oinfo *loi,
3120 struct obd_io_group *oig)
3122 struct client_obd *cli = &exp->exp_obd->u.cli;
3126 loi = lsm->lsm_oinfo[0];
3128 client_obd_list_lock(&cli->cl_loi_list_lock);
3130 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3131 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3133 osc_check_rpcs(cli);
3134 client_obd_list_unlock(&cli->cl_loi_list_lock);
3139 static int osc_teardown_async_page(struct obd_export *exp,
3140 struct lov_stripe_md *lsm,
3141 struct lov_oinfo *loi, void *cookie)
3143 struct client_obd *cli = &exp->exp_obd->u.cli;
3144 struct loi_oap_pages *lop;
3145 struct osc_async_page *oap;
3149 oap = oap_from_cookie(cookie);
3151 RETURN(PTR_ERR(oap));
3154 loi = lsm->lsm_oinfo[0];
3156 if (oap->oap_cmd & OBD_BRW_WRITE) {
3157 lop = &loi->loi_write_lop;
3159 lop = &loi->loi_read_lop;
3162 client_obd_list_lock(&cli->cl_loi_list_lock);
3164 if (!list_empty(&oap->oap_rpc_item))
3165 GOTO(out, rc = -EBUSY);
3167 osc_exit_cache(cli, oap, 0);
3168 osc_wake_cache_waiters(cli);
3170 if (!list_empty(&oap->oap_urgent_item)) {
3171 list_del_init(&oap->oap_urgent_item);
3172 spin_lock(&oap->oap_lock);
3173 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3174 spin_unlock(&oap->oap_lock);
3177 if (!list_empty(&oap->oap_pending_item)) {
3178 list_del_init(&oap->oap_pending_item);
3179 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3181 loi_list_maint(cli, loi);
3182 cache_remove_extent(cli->cl_cache, oap);
3184 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3186 client_obd_list_unlock(&cli->cl_loi_list_lock);
3190 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3191 struct ldlm_lock_desc *new, void *data,
3194 struct lustre_handle lockh = { 0 };
3198 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3199 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3204 case LDLM_CB_BLOCKING:
3205 ldlm_lock2handle(lock, &lockh);
3206 rc = ldlm_cli_cancel(&lockh);
3208 CERROR("ldlm_cli_cancel failed: %d\n", rc);
3210 case LDLM_CB_CANCELING: {
3212 ldlm_lock2handle(lock, &lockh);
3213 /* This lock wasn't granted, don't try to do anything */
3214 if (lock->l_req_mode != lock->l_granted_mode)
3217 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3220 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3221 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3222 lock, new, data,flag);
3231 EXPORT_SYMBOL(osc_extent_blocking_cb);
3233 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3236 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3239 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3242 lock_res_and_lock(lock);
3243 #if defined (__KERNEL__) && defined (__linux__)
3244 /* Liang XXX: Darwin and Winnt checking should be added */
3245 if (lock->l_ast_data && lock->l_ast_data != data) {
3246 struct inode *new_inode = data;
3247 struct inode *old_inode = lock->l_ast_data;
3248 if (!(old_inode->i_state & I_FREEING))
3249 LDLM_ERROR(lock, "inconsistent l_ast_data found");
3250 LASSERTF(old_inode->i_state & I_FREEING,
3251 "Found existing inode %p/%lu/%u state %lu in lock: "
3252 "setting data to %p/%lu/%u\n", old_inode,
3253 old_inode->i_ino, old_inode->i_generation,
3255 new_inode, new_inode->i_ino, new_inode->i_generation);
3258 lock->l_ast_data = data;
3259 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3260 unlock_res_and_lock(lock);
3261 LDLM_LOCK_PUT(lock);
3264 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3265 ldlm_iterator_t replace, void *data)
3267 struct ldlm_res_id res_id;
3268 struct obd_device *obd = class_exp2obd(exp);
3270 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3271 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3275 /* find any ldlm lock of the inode in osc
3279 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3280 ldlm_iterator_t replace, void *data)
3282 struct ldlm_res_id res_id;
3283 struct obd_device *obd = class_exp2obd(exp);
3286 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3287 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3288 if (rc == LDLM_ITER_STOP)
3290 if (rc == LDLM_ITER_CONTINUE)
3295 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3296 struct obd_info *oinfo, int intent, int rc)
3301 /* The request was created before ldlm_cli_enqueue call. */
3302 if (rc == ELDLM_LOCK_ABORTED) {
3303 struct ldlm_reply *rep;
3305 /* swabbed by ldlm_cli_enqueue() */
3306 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3307 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3309 LASSERT(rep != NULL);
3310 if (rep->lock_policy_res1)
3311 rc = rep->lock_policy_res1;
3315 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3316 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3317 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3318 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3319 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3323 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3325 /* Call the update callback. */
3326 rc = oinfo->oi_cb_up(oinfo, rc);
3330 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3333 struct osc_enqueue_args *aa = data;
3334 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3335 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3336 struct ldlm_lock *lock;
3338 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3340 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3342 /* Complete obtaining the lock procedure. */
3343 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3345 &aa->oa_oi->oi_flags,
3346 &lsm->lsm_oinfo[0]->loi_lvb,
3347 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3348 lustre_swab_ost_lvb,
3349 aa->oa_oi->oi_lockh, rc);
3351 /* Complete osc stuff. */
3352 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3354 /* Release the lock for async request. */
3355 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3356 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3358 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3359 aa->oa_oi->oi_lockh, req, aa);
3360 LDLM_LOCK_PUT(lock);
3364 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3365 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3366 * other synchronous requests, however keeping some locks and trying to obtain
3367 * others may take a considerable amount of time in a case of ost failure; and
3368 * when other sync requests do not get released lock from a client, the client
3369 * is excluded from the cluster -- such scenarious make the life difficult, so
3370 * release locks just after they are obtained. */
3371 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3372 struct ldlm_enqueue_info *einfo,
3373 struct ptlrpc_request_set *rqset)
3375 struct ldlm_res_id res_id;
3376 struct obd_device *obd = exp->exp_obd;
3377 struct ldlm_reply *rep;
3378 struct ptlrpc_request *req = NULL;
3379 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3384 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3385 oinfo->oi_md->lsm_object_gr, &res_id);
3386 /* Filesystem lock extents are extended to page boundaries so that
3387 * dealing with the page cache is a little smoother. */
3388 oinfo->oi_policy.l_extent.start -=
3389 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3390 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3392 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3395 /* Next, search for already existing extent locks that will cover us */
3396 /* If we're trying to read, we also search for an existing PW lock. The
3397 * VFS and page cache already protect us locally, so lots of readers/
3398 * writers can share a single PW lock.
3400 * There are problems with conversion deadlocks, so instead of
3401 * converting a read lock to a write lock, we'll just enqueue a new
3404 * At some point we should cancel the read lock instead of making them
3405 * send us a blocking callback, but there are problems with canceling
3406 * locks out from other users right now, too. */
3407 mode = einfo->ei_mode;
3408 if (einfo->ei_mode == LCK_PR)
3410 mode = ldlm_lock_match(obd->obd_namespace,
3411 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3412 einfo->ei_type, &oinfo->oi_policy, mode,
3415 /* addref the lock only if not async requests and PW lock is
3416 * matched whereas we asked for PR. */
3417 if (!rqset && einfo->ei_mode != mode)
3418 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3419 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3422 /* I would like to be able to ASSERT here that rss <=
3423 * kms, but I can't, for reasons which are explained in
3427 /* We already have a lock, and it's referenced */
3428 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3430 /* For async requests, decref the lock. */
3431 if (einfo->ei_mode != mode)
3432 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3434 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3442 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3443 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
3444 [DLM_LOCKREQ_OFF + 1] = 0 };
3446 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3450 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3451 size[DLM_REPLY_REC_OFF] =
3452 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3453 ptlrpc_req_set_repsize(req, 3, size);
3456 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3457 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3459 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3460 &oinfo->oi_policy, &oinfo->oi_flags,
3461 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3462 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3463 lustre_swab_ost_lvb, oinfo->oi_lockh,
3467 struct osc_enqueue_args *aa;
3468 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3469 aa = ptlrpc_req_async_args(req);
3474 req->rq_interpret_reply = osc_enqueue_interpret;
3475 ptlrpc_set_add_req(rqset, req);
3476 } else if (intent) {
3477 ptlrpc_req_finished(req);
3482 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3484 ptlrpc_req_finished(req);
3489 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3490 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3491 int *flags, void *data, struct lustre_handle *lockh,
3494 struct ldlm_res_id res_id;
3495 struct obd_device *obd = exp->exp_obd;
3496 int lflags = *flags;
3500 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3502 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3504 /* Filesystem lock extents are extended to page boundaries so that
3505 * dealing with the page cache is a little smoother */
3506 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3507 policy->l_extent.end |= ~CFS_PAGE_MASK;
3509 /* Next, search for already existing extent locks that will cover us */
3510 /* If we're trying to read, we also search for an existing PW lock. The
3511 * VFS and page cache already protect us locally, so lots of readers/
3512 * writers can share a single PW lock. */
3516 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3517 &res_id, type, policy, rc, lockh);
3519 osc_set_data_with_check(lockh, data, lflags);
3520 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3521 ldlm_lock_addref(lockh, LCK_PR);
3522 ldlm_lock_decref(lockh, LCK_PW);
3524 if (n_matches != NULL)
3531 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3532 __u32 mode, struct lustre_handle *lockh, int flags,
3537 if (unlikely(mode == LCK_GROUP))
3538 ldlm_lock_decref_and_cancel(lockh, mode);
3540 ldlm_lock_decref(lockh, mode);
3545 static int osc_cancel_unused(struct obd_export *exp,
3546 struct lov_stripe_md *lsm, int flags, void *opaque)
3548 struct obd_device *obd = class_exp2obd(exp);
3549 struct ldlm_res_id res_id, *resp = NULL;
3552 resp = osc_build_res_name(lsm->lsm_object_id,
3553 lsm->lsm_object_gr, &res_id);
3556 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3560 static int osc_join_lru(struct obd_export *exp,
3561 struct lov_stripe_md *lsm, int join)
3563 struct obd_device *obd = class_exp2obd(exp);
3564 struct ldlm_res_id res_id, *resp = NULL;
3567 resp = osc_build_res_name(lsm->lsm_object_id,
3568 lsm->lsm_object_gr, &res_id);
3571 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3575 static int osc_statfs_interpret(struct ptlrpc_request *req,
3578 struct osc_async_args *aa = data;
3579 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3580 struct obd_statfs *msfs;
3585 /* The request has in fact never been sent
3586 * due to issues at a higher level (LOV).
3587 * Exit immediately since the caller is
3588 * aware of the problem and takes care
3589 * of the clean up */
3592 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3593 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3599 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3600 lustre_swab_obd_statfs);
3602 CERROR("Can't unpack obd_statfs\n");
3603 GOTO(out, rc = -EPROTO);
3606 /* Reinitialize the RDONLY and DEGRADED flags at the client
3607 * on each statfs, so they don't stay set permanently. */
3608 spin_lock(&cli->cl_oscc.oscc_lock);
3610 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3611 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3612 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3613 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3615 if (unlikely(msfs->os_state & OS_STATE_READONLY))
3616 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3617 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3618 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3620 /* Add a bit of hysteresis so this flag isn't continually flapping,
3621 * and ensure that new files don't get extremely fragmented due to
3622 * only a small amount of available space in the filesystem.
3623 * We want to set the NOSPC flag when there is less than ~0.1% free
3624 * and clear it when there is at least ~0.2% free space, so:
3625 * avail < ~0.1% max max = avail + used
3626 * 1025 * avail < avail + used used = blocks - free
3627 * 1024 * avail < used
3628 * 1024 * avail < blocks - free
3629 * avail < ((blocks - free) >> 10)
3631 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3632 * lose that amount of space so in those cases we report no space left
3633 * if their is less than 1 GB left. */
3634 used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3635 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3636 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3637 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3638 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3639 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3640 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3642 spin_unlock(&cli->cl_oscc.oscc_lock);
3644 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3646 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3650 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3651 __u64 max_age, struct ptlrpc_request_set *rqset)
3653 struct ptlrpc_request *req;
3654 struct osc_async_args *aa;
3655 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3658 /* We could possibly pass max_age in the request (as an absolute
3659 * timestamp or a "seconds.usec ago") so the target can avoid doing
3660 * extra calls into the filesystem if that isn't necessary (e.g.
3661 * during mount that would help a bit). Having relative timestamps
3662 * is not so great if request processing is slow, while absolute
3663 * timestamps are not ideal because they need time synchronization. */
3664 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3665 OST_STATFS, 1, NULL, NULL);
3669 ptlrpc_req_set_repsize(req, 2, size);
3670 req->rq_request_portal = OST_CREATE_PORTAL;
3671 ptlrpc_at_set_req_timeout(req);
3672 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3673 /* procfs requests not want stat in wait for avoid deadlock */
3674 req->rq_no_resend = 1;
3675 req->rq_no_delay = 1;
3678 req->rq_interpret_reply = osc_statfs_interpret;
3679 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3680 aa = ptlrpc_req_async_args(req);
3683 ptlrpc_set_add_req(rqset, req);
3687 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3688 __u64 max_age, __u32 flags)
3690 struct obd_statfs *msfs;
3691 struct ptlrpc_request *req;
3692 struct obd_import *imp = NULL;
3693 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3697 /*Since the request might also come from lprocfs, so we need
3698 *sync this with client_disconnect_export Bug15684*/
3699 down_read(&obd->u.cli.cl_sem);
3700 if (obd->u.cli.cl_import)
3701 imp = class_import_get(obd->u.cli.cl_import);
3702 up_read(&obd->u.cli.cl_sem);
3706 /* We could possibly pass max_age in the request (as an absolute
3707 * timestamp or a "seconds.usec ago") so the target can avoid doing
3708 * extra calls into the filesystem if that isn't necessary (e.g.
3709 * during mount that would help a bit). Having relative timestamps
3710 * is not so great if request processing is slow, while absolute
3711 * timestamps are not ideal because they need time synchronization. */
3712 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3713 OST_STATFS, 1, NULL, NULL);
3715 class_import_put(imp);
3719 ptlrpc_req_set_repsize(req, 2, size);
3720 req->rq_request_portal = OST_CREATE_PORTAL;
3721 ptlrpc_at_set_req_timeout(req);
3723 if (flags & OBD_STATFS_NODELAY) {
3724 /* procfs requests not want stat in wait for avoid deadlock */
3725 req->rq_no_resend = 1;
3726 req->rq_no_delay = 1;
3729 rc = ptlrpc_queue_wait(req);
3733 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3734 lustre_swab_obd_statfs);
3736 CERROR("Can't unpack obd_statfs\n");
3737 GOTO(out, rc = -EPROTO);
3740 memcpy(osfs, msfs, sizeof(*osfs));
3744 ptlrpc_req_finished(req);
3748 /* Retrieve object striping information.
3750 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3751 * the maximum number of OST indices which will fit in the user buffer.
3752 * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3754 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3756 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3757 struct lov_user_md_v3 lum, *lumk;
3758 int rc = 0, lum_size;
3759 struct lov_user_ost_data_v1 *lmm_objects;
3765 /* we only need the header part from user space to get lmm_magic and
3766 * lmm_stripe_count, (the header part is common to v1 and v3) */
3767 lum_size = sizeof(struct lov_user_md_v1);
3768 memset(&lum, 0x00, sizeof(lum));
3769 if (copy_from_user(&lum, lump, lum_size))
3772 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3773 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3776 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3777 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3778 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3779 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3781 /* we can use lov_mds_md_size() to compute lum_size
3782 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3783 if (lum.lmm_stripe_count > 0) {
3784 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3785 OBD_ALLOC(lumk, lum_size);
3788 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3789 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3791 lmm_objects = &(lumk->lmm_objects[0]);
3792 lmm_objects->l_object_id = lsm->lsm_object_id;
3794 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3798 lumk->lmm_magic = lum.lmm_magic;
3799 lumk->lmm_stripe_count = 1;
3800 lumk->lmm_object_id = lsm->lsm_object_id;
3802 if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3803 (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3804 /* lsm not in host order, so count also need be in same order */
3805 __swab32s(&lumk->lmm_magic);
3806 __swab16s(&lumk->lmm_stripe_count);
3807 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3808 if (lum.lmm_stripe_count > 0)
3809 lustre_swab_lov_user_md_objects(
3810 (struct lov_user_md_v1*)lumk);
3813 if (copy_to_user(lump, lumk, lum_size))
3817 OBD_FREE(lumk, lum_size);
3823 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3824 void *karg, void *uarg)
3826 struct obd_device *obd = exp->exp_obd;
3827 struct obd_ioctl_data *data = karg;
3831 if (!try_module_get(THIS_MODULE)) {
3832 CERROR("Can't get module. Is it alive?");
3836 case OBD_IOC_LOV_GET_CONFIG: {
3838 struct lov_desc *desc;
3839 struct obd_uuid uuid;
3843 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3844 GOTO(out, err = -EINVAL);
3846 data = (struct obd_ioctl_data *)buf;
3848 if (sizeof(*desc) > data->ioc_inllen1) {
3849 obd_ioctl_freedata(buf, len);
3850 GOTO(out, err = -EINVAL);
3853 if (data->ioc_inllen2 < sizeof(uuid)) {
3854 obd_ioctl_freedata(buf, len);
3855 GOTO(out, err = -EINVAL);
3858 desc = (struct lov_desc *)data->ioc_inlbuf1;
3859 desc->ld_tgt_count = 1;
3860 desc->ld_active_tgt_count = 1;
3861 desc->ld_default_stripe_count = 1;
3862 desc->ld_default_stripe_size = 0;
3863 desc->ld_default_stripe_offset = 0;
3864 desc->ld_pattern = 0;
3865 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3867 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3869 err = copy_to_user((void *)uarg, buf, len);
3872 obd_ioctl_freedata(buf, len);
3875 case LL_IOC_LOV_SETSTRIPE:
3876 err = obd_alloc_memmd(exp, karg);
3880 case LL_IOC_LOV_GETSTRIPE:
3881 err = osc_getstripe(karg, uarg);
3883 case OBD_IOC_CLIENT_RECOVER:
3884 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3889 case IOC_OSC_SET_ACTIVE:
3890 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3893 case OBD_IOC_POLL_QUOTACHECK:
3894 err = lquota_poll_check(quota_interface, exp,
3895 (struct if_quotacheck *)karg);
3897 case OBD_IOC_DESTROY: {
3900 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3901 GOTO (out, err = -EPERM);
3902 oa = &data->ioc_obdo1;
3905 GOTO(out, err = -EINVAL);
3907 oa->o_valid |= OBD_MD_FLGROUP;
3909 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3912 case OBD_IOC_PING_TARGET:
3913 err = ptlrpc_obd_ping(obd);
3916 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3917 cmd, cfs_curproc_comm());
3918 GOTO(out, err = -ENOTTY);
3921 module_put(THIS_MODULE);
3925 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3926 void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3929 if (!vallen || !val)
3932 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3933 __u32 *stripe = val;
3934 *vallen = sizeof(*stripe);
3937 } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
3938 struct client_obd *cli = &exp->exp_obd->u.cli;
3939 __u64 *rpcsize = val;
3940 LASSERT(*vallen == sizeof(__u64));
3941 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
3943 } else if (KEY_IS(KEY_LAST_ID)) {
3944 struct ptlrpc_request *req;
3946 char *bufs[2] = { NULL, key };
3947 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3950 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3951 OST_GET_INFO, 2, size, bufs);
3955 size[REPLY_REC_OFF] = *vallen;
3956 ptlrpc_req_set_repsize(req, 2, size);
3957 rc = ptlrpc_queue_wait(req);
3961 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3962 lustre_swab_ost_last_id);
3963 if (reply == NULL) {
3964 CERROR("Can't unpack OST last ID\n");
3965 GOTO(out, rc = -EPROTO);
3967 *((obd_id *)val) = *reply;
3969 ptlrpc_req_finished(req);
3971 } else if (KEY_IS(KEY_FIEMAP)) {
3972 struct ptlrpc_request *req;
3973 struct ll_user_fiemap *reply;
3974 char *bufs[2] = { NULL, key };
3975 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3978 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3979 OST_GET_INFO, 2, size, bufs);
3983 size[REPLY_REC_OFF] = *vallen;
3984 ptlrpc_req_set_repsize(req, 2, size);
3986 rc = ptlrpc_queue_wait(req);
3989 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3990 lustre_swab_fiemap);
3991 if (reply == NULL) {
3992 CERROR("Can't unpack FIEMAP reply.\n");
3993 GOTO(out1, rc = -EPROTO);
3996 memcpy(val, reply, *vallen);
3999 ptlrpc_req_finished(req);
4007 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
4010 struct llog_ctxt *ctxt;
4011 struct obd_import *imp = req->rq_import;
4017 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4020 rc = llog_initiator_connect(ctxt);
4022 CERROR("cannot establish connection for "
4023 "ctxt %p: %d\n", ctxt, rc);
4026 llog_ctxt_put(ctxt);
4027 spin_lock(&imp->imp_lock);
4028 imp->imp_server_timeout = 1;
4029 imp->imp_pingable = 1;
4030 spin_unlock(&imp->imp_lock);
4031 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4036 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4037 void *key, obd_count vallen, void *val,
4038 struct ptlrpc_request_set *set)
4040 struct ptlrpc_request *req;
4041 struct obd_device *obd = exp->exp_obd;
4042 struct obd_import *imp = class_exp2cliimp(exp);
4043 __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
4044 char *bufs[3] = { NULL, key, val };
4047 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4049 if (KEY_IS(KEY_NEXT_ID)) {
4051 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4053 if (vallen != sizeof(obd_id))
4056 /* avoid race between allocate new object and set next id
4057 * from ll_sync thread */
4058 spin_lock(&oscc->oscc_lock);
4059 new_val = *((obd_id*)val) + 1;
4060 if (new_val > oscc->oscc_next_id)
4061 oscc->oscc_next_id = new_val;
4062 spin_unlock(&oscc->oscc_lock);
4064 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4065 exp->exp_obd->obd_name,
4066 oscc->oscc_next_id);
4071 if (KEY_IS(KEY_INIT_RECOV)) {
4072 if (vallen != sizeof(int))
4074 spin_lock(&imp->imp_lock);
4075 imp->imp_initial_recov = *(int *)val;
4076 spin_unlock(&imp->imp_lock);
4077 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4078 exp->exp_obd->obd_name,
4079 imp->imp_initial_recov);
4083 if (KEY_IS(KEY_CHECKSUM)) {
4084 if (vallen != sizeof(int))
4086 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4090 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4093 /* We pass all other commands directly to OST. Since nobody calls osc
4094 methods directly and everybody is supposed to go through LOV, we
4095 assume lov checked invalid values for us.
4096 The only recognised values so far are evict_by_nid and mds_conn.
4097 Even if something bad goes through, we'd get a -EINVAL from OST
4100 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4105 if (KEY_IS(KEY_MDS_CONN))
4106 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4107 else if (KEY_IS(KEY_GRANT_SHRINK))
4108 req->rq_interpret_reply = osc_shrink_grant_interpret;
4110 if (KEY_IS(KEY_GRANT_SHRINK)) {
4111 struct osc_grant_args *aa;
4114 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4115 aa = ptlrpc_req_async_args(req);
4118 ptlrpc_req_finished(req);
4121 *oa = ((struct ost_body *)val)->oa;
4125 ptlrpc_req_set_repsize(req, 2, size);
4126 ptlrpcd_add_req(req);
4128 ptlrpc_req_set_repsize(req, 1, NULL);
4129 ptlrpc_set_add_req(set, req);
4130 ptlrpc_check_set(set);
4137 static struct llog_operations osc_size_repl_logops = {
4138 lop_cancel: llog_obd_repl_cancel
4141 static struct llog_operations osc_mds_ost_orig_logops;
4142 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4145 struct llog_catid catid;
4146 static char name[32] = CATLIST;
4152 mutex_down(&disk_obd->obd_llog_cat_process);
4154 rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4156 CERROR("rc: %d\n", rc);
4157 GOTO(out_unlock, rc);
4160 CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4161 obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4162 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4165 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4166 &catid.lci_logid, &osc_mds_ost_orig_logops);
4168 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4172 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4173 &osc_size_repl_logops);
4175 struct llog_ctxt *ctxt =
4176 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4179 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4183 CERROR("osc '%s' tgt '%s' rc=%d\n",
4184 obd->obd_name, disk_obd->obd_name, rc);
4185 CERROR("logid "LPX64":0x%x\n",
4186 catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4188 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4191 CERROR("rc: %d\n", rc);
4194 mutex_up(&disk_obd->obd_llog_cat_process);
4199 static int osc_llog_finish(struct obd_device *obd, int count)
4201 struct llog_ctxt *ctxt;
4202 int rc = 0, rc2 = 0;
4205 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4207 rc = llog_cleanup(ctxt);
4209 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4211 rc2 = llog_cleanup(ctxt);
4218 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4219 struct obd_uuid *cluuid,
4220 struct obd_connect_data *data,
4223 struct client_obd *cli = &obd->u.cli;
4225 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4228 client_obd_list_lock(&cli->cl_loi_list_lock);
4229 data->ocd_grant = cli->cl_avail_grant ?:
4230 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4231 lost_grant = cli->cl_lost_grant;
4232 cli->cl_lost_grant = 0;
4233 client_obd_list_unlock(&cli->cl_loi_list_lock);
4235 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4236 "cl_lost_grant: %ld\n", data->ocd_grant,
4237 cli->cl_avail_grant, lost_grant);
4238 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4239 " ocd_grant: %d\n", data->ocd_connect_flags,
4240 data->ocd_version, data->ocd_grant);
4246 static int osc_disconnect(struct obd_export *exp)
4248 struct obd_device *obd = class_exp2obd(exp);
4249 struct llog_ctxt *ctxt;
4252 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4254 if (obd->u.cli.cl_conn_count == 1) {
4255 /* Flush any remaining cancel messages out to the
4257 llog_sync(ctxt, exp);
4259 llog_ctxt_put(ctxt);
4261 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4265 rc = client_disconnect_export(exp);
4267 * Initially we put del_shrink_grant before disconnect_export, but it
4268 * causes the following problem if setup (connect) and cleanup
4269 * (disconnect) are tangled together.
4270 * connect p1 disconnect p2
4271 * ptlrpc_connect_import
4272 * ............... class_manual_cleanup
4275 * ptlrpc_connect_interrupt
4277 * add this client to shrink list
4279 * Bang! pinger trigger the shrink.
4280 * So the osc should be disconnected from the shrink list, after we
4281 * are sure the import has been destroyed. BUG18662
4283 if (obd->u.cli.cl_import == NULL)
4284 osc_del_shrink_grant(&obd->u.cli);
4288 static int osc_import_event(struct obd_device *obd,
4289 struct obd_import *imp,
4290 enum obd_import_event event)
4292 struct client_obd *cli;
4296 LASSERT(imp->imp_obd == obd);
4299 case IMP_EVENT_DISCON: {
4300 /* Only do this on the MDS OSC's */
4301 if (imp->imp_server_timeout) {
4302 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4304 spin_lock(&oscc->oscc_lock);
4305 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4306 spin_unlock(&oscc->oscc_lock);
4309 client_obd_list_lock(&cli->cl_loi_list_lock);
4310 cli->cl_avail_grant = 0;
4311 cli->cl_lost_grant = 0;
4312 client_obd_list_unlock(&cli->cl_loi_list_lock);
4313 ptlrpc_import_setasync(imp, -1);
4317 case IMP_EVENT_INACTIVE: {
4318 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4321 case IMP_EVENT_INVALIDATE: {
4322 struct ldlm_namespace *ns = obd->obd_namespace;
4326 client_obd_list_lock(&cli->cl_loi_list_lock);
4327 /* all pages go to failing rpcs due to the invalid import */
4328 osc_check_rpcs(cli);
4329 client_obd_list_unlock(&cli->cl_loi_list_lock);
4331 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4335 case IMP_EVENT_ACTIVE: {
4336 /* Only do this on the MDS OSC's */
4337 if (imp->imp_server_timeout) {
4338 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4340 spin_lock(&oscc->oscc_lock);
4341 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4342 spin_unlock(&oscc->oscc_lock);
4344 CDEBUG(D_INFO, "notify server \n");
4345 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4348 case IMP_EVENT_OCD: {
4349 struct obd_connect_data *ocd = &imp->imp_connect_data;
4351 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4352 osc_init_grant(&obd->u.cli, ocd);
4355 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4356 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4358 ptlrpc_import_setasync(imp, 1);
4359 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4363 CERROR("Unknown import event %d\n", event);
4369 /* determine whether the lock can be canceled before replaying the lock
4370 * during recovery, see bug16774 for detailed information
4373 * zero - the lock can't be canceled
4374 * other - ok to cancel
4376 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4378 check_res_locked(lock->l_resource);
4379 if (lock->l_granted_mode == LCK_GROUP ||
4380 lock->l_resource->lr_type != LDLM_EXTENT)
4383 /* cancel all unused extent locks with granted mode LCK_PR or LCK_CR */
4384 if (lock->l_granted_mode == LCK_PR ||
4385 lock->l_granted_mode == LCK_CR)
4391 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4397 rc = ptlrpcd_addref();
4401 rc = client_obd_setup(obd, len, buf);
4405 struct lprocfs_static_vars lvars = { 0 };
4406 struct client_obd *cli = &obd->u.cli;
4408 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4409 lprocfs_osc_init_vars(&lvars);
4410 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4411 lproc_osc_attach_seqstat(obd);
4412 ptlrpc_lprocfs_register_obd(obd);
4416 /* We need to allocate a few requests more, because
4417 brw_interpret tries to create new requests before freeing
4418 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4419 reserved, but I afraid that might be too much wasted RAM
4420 in fact, so 2 is just my guess and still should work. */
4421 cli->cl_import->imp_rq_pool =
4422 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4424 ptlrpc_add_rqs_to_pool);
4425 cli->cl_cache = cache_create(obd);
4426 if (!cli->cl_cache) {
4430 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4431 sema_init(&cli->cl_grant_sem, 1);
4433 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4439 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4445 case OBD_CLEANUP_EARLY: {
4446 struct obd_import *imp;
4447 imp = obd->u.cli.cl_import;
4448 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4449 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4450 ptlrpc_deactivate_import(imp);
4453 case OBD_CLEANUP_EXPORTS: {
4454 /* If we set up but never connected, the
4455 client import will not have been cleaned. */
4456 down_write(&obd->u.cli.cl_sem);
4457 if (obd->u.cli.cl_import) {
4458 struct obd_import *imp;
4459 imp = obd->u.cli.cl_import;
4460 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4462 ptlrpc_invalidate_import(imp);
4463 if (imp->imp_rq_pool) {
4464 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4465 imp->imp_rq_pool = NULL;
4467 class_destroy_import(imp);
4468 obd->u.cli.cl_import = NULL;
4470 up_write(&obd->u.cli.cl_sem);
4472 rc = obd_llog_finish(obd, 0);
4474 CERROR("failed to cleanup llogging subsystems\n");
4477 case OBD_CLEANUP_SELF_EXP:
4479 case OBD_CLEANUP_OBD:
4485 int osc_cleanup(struct obd_device *obd)
4490 ptlrpc_lprocfs_unregister_obd(obd);
4491 lprocfs_obd_cleanup(obd);
4493 /* free memory of osc quota cache */
4494 lquota_cleanup(quota_interface, obd);
4496 cache_destroy(obd->u.cli.cl_cache);
4497 rc = client_obd_cleanup(obd);
4503 static int osc_register_page_removal_cb(struct obd_device *obd,
4504 obd_page_removal_cb_t func,
4505 obd_pin_extent_cb pin_cb)
4509 /* this server - not need init */
4513 return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4517 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4518 obd_page_removal_cb_t func)
4521 return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4524 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4525 obd_lock_cancel_cb cb)
4528 LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4530 /* this server - not need init */
4534 obd->u.cli.cl_ext_lock_cancel_cb = cb;
4538 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4539 obd_lock_cancel_cb cb)
4543 if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4544 CERROR("Unregistering cancel cb %p, while only %p was "
4546 obd->u.cli.cl_ext_lock_cancel_cb);
4550 obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4554 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4556 struct lustre_cfg *lcfg = buf;
4557 struct lprocfs_static_vars lvars = { 0 };
4560 lprocfs_osc_init_vars(&lvars);
4562 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4566 struct obd_ops osc_obd_ops = {
4567 .o_owner = THIS_MODULE,
4568 .o_setup = osc_setup,
4569 .o_precleanup = osc_precleanup,
4570 .o_cleanup = osc_cleanup,
4571 .o_add_conn = client_import_add_conn,
4572 .o_del_conn = client_import_del_conn,
4573 .o_connect = client_connect_import,
4574 .o_reconnect = osc_reconnect,
4575 .o_disconnect = osc_disconnect,
4576 .o_statfs = osc_statfs,
4577 .o_statfs_async = osc_statfs_async,
4578 .o_packmd = osc_packmd,
4579 .o_unpackmd = osc_unpackmd,
4580 .o_precreate = osc_precreate,
4581 .o_create = osc_create,
4582 .o_create_async = osc_create_async,
4583 .o_destroy = osc_destroy,
4584 .o_getattr = osc_getattr,
4585 .o_getattr_async = osc_getattr_async,
4586 .o_setattr = osc_setattr,
4587 .o_setattr_async = osc_setattr_async,
4589 .o_brw_async = osc_brw_async,
4590 .o_prep_async_page = osc_prep_async_page,
4591 .o_get_lock = osc_get_lock,
4592 .o_queue_async_io = osc_queue_async_io,
4593 .o_set_async_flags = osc_set_async_flags,
4594 .o_queue_group_io = osc_queue_group_io,
4595 .o_trigger_group_io = osc_trigger_group_io,
4596 .o_teardown_async_page = osc_teardown_async_page,
4597 .o_punch = osc_punch,
4599 .o_enqueue = osc_enqueue,
4600 .o_match = osc_match,
4601 .o_change_cbdata = osc_change_cbdata,
4602 .o_find_cbdata = osc_find_cbdata,
4603 .o_cancel = osc_cancel,
4604 .o_cancel_unused = osc_cancel_unused,
4605 .o_join_lru = osc_join_lru,
4606 .o_iocontrol = osc_iocontrol,
4607 .o_get_info = osc_get_info,
4608 .o_set_info_async = osc_set_info_async,
4609 .o_import_event = osc_import_event,
4610 .o_llog_init = osc_llog_init,
4611 .o_llog_finish = osc_llog_finish,
4612 .o_process_config = osc_process_config,
4613 .o_register_page_removal_cb = osc_register_page_removal_cb,
4614 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4615 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4616 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4618 int __init osc_init(void)
4620 struct lprocfs_static_vars lvars = { 0 };
4624 lprocfs_osc_init_vars(&lvars);
4626 request_module("lquota");
4627 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4628 lquota_init(quota_interface);
4629 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4631 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4634 if (quota_interface)
4635 PORTAL_SYMBOL_PUT(osc_quota_interface);
4639 osc_mds_ost_orig_logops = llog_lvfs_ops;
4640 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4641 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4642 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4643 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4649 static void /*__exit*/ osc_exit(void)
4651 lquota_exit(quota_interface);
4652 if (quota_interface)
4653 PORTAL_SYMBOL_PUT(osc_quota_interface);
4655 class_unregister_type(LUSTRE_OSC_NAME);
4658 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4659 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4660 MODULE_LICENSE("GPL");
4662 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);