1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
78 atomic_t osc_resend_time;
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82 struct lov_stripe_md *lsm)
87 lmm_size = sizeof(**lmmp);
92 OBD_FREE(*lmmp, lmm_size);
98 OBD_ALLOC(*lmmp, lmm_size);
104 LASSERT(lsm->lsm_object_id);
105 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113 struct lov_mds_md *lmm, int lmm_bytes)
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == 0) {
127 CERROR("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
136 if (*lsmp != NULL && lmm == NULL) {
137 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 OBD_FREE(*lsmp, lsm_size);
144 OBD_ALLOC(*lsmp, lsm_size);
147 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149 OBD_FREE(*lsmp, lsm_size);
152 loi_init((*lsmp)->lsm_oinfo[0]);
156 /* XXX zero *lsmp? */
157 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158 LASSERT((*lsmp)->lsm_object_id);
161 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167 struct osc_async_args *aa, int rc)
169 struct ost_body *body;
175 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
176 lustre_swab_ost_body);
178 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
179 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
181 /* This should really be sent by the OST */
182 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
183 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
185 CERROR("can't unpack ost_body\n");
187 aa->aa_oi->oi_oa->o_valid = 0;
190 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
194 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
195 struct ptlrpc_request_set *set)
197 struct ptlrpc_request *req;
198 struct ost_body *body;
199 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
200 struct osc_async_args *aa;
203 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
204 OST_GETATTR, 2, size,NULL);
208 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
209 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
211 ptlrpc_req_set_repsize(req, 2, size);
212 req->rq_interpret_reply = osc_getattr_interpret;
214 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
215 aa = ptlrpc_req_async_args(req);
218 ptlrpc_set_add_req(set, req);
222 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
224 struct ptlrpc_request *req;
225 struct ost_body *body;
226 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
230 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
231 OST_GETATTR, 2, size, NULL);
235 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
236 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
238 ptlrpc_req_set_repsize(req, 2, size);
240 rc = ptlrpc_queue_wait(req);
242 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
246 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
247 lustre_swab_ost_body);
249 CERROR ("can't unpack ost_body\n");
250 GOTO (out, rc = -EPROTO);
253 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
254 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
256 /* This should really be sent by the OST */
257 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
258 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
262 ptlrpc_req_finished(req);
266 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
267 struct obd_trans_info *oti)
269 struct ptlrpc_request *req;
270 struct ost_body *body;
271 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
275 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
276 OST_SETATTR, 2, size, NULL);
280 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
281 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
283 ptlrpc_req_set_repsize(req, 2, size);
285 rc = ptlrpc_queue_wait(req);
289 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
290 lustre_swab_ost_body);
292 GOTO(out, rc = -EPROTO);
294 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
298 ptlrpc_req_finished(req);
302 static int osc_setattr_interpret(struct ptlrpc_request *req,
303 struct osc_async_args *aa, int rc)
305 struct ost_body *body;
311 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
312 lustre_swab_ost_body);
314 CERROR("can't unpack ost_body\n");
315 GOTO(out, rc = -EPROTO);
318 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
320 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
324 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
325 struct obd_trans_info *oti,
326 struct ptlrpc_request_set *rqset)
328 struct ptlrpc_request *req;
329 struct ost_body *body;
330 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
332 struct osc_async_args *aa;
335 if (osc_exp_is_2_0_server(exp)) {
339 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
340 OST_SETATTR, bufcount, size, NULL);
344 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
346 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
348 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
351 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
352 ptlrpc_req_set_repsize(req, 2, size);
353 /* do mds to ost setattr asynchronouly */
355 /* Do not wait for response. */
356 ptlrpcd_add_req(req);
358 req->rq_interpret_reply = osc_setattr_interpret;
360 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
361 aa = ptlrpc_req_async_args(req);
364 ptlrpc_set_add_req(rqset, req);
370 int osc_real_create(struct obd_export *exp, struct obdo *oa,
371 struct lov_stripe_md **ea, struct obd_trans_info *oti)
373 struct ptlrpc_request *req;
374 struct ost_body *body;
375 struct lov_stripe_md *lsm;
376 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
385 rc = obd_alloc_memmd(exp, &lsm);
390 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
391 OST_CREATE, 2, size, NULL);
393 GOTO(out, rc = -ENOMEM);
395 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
396 lustre_set_wire_obdo(&body->oa, oa);
398 ptlrpc_req_set_repsize(req, 2, size);
399 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400 oa->o_flags == OBD_FL_DELORPHAN) {
402 "delorphan from OST integration");
403 /* Don't resend the delorphan req */
404 req->rq_no_resend = req->rq_no_delay = 1;
407 rc = ptlrpc_queue_wait(req);
411 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
412 lustre_swab_ost_body);
414 CERROR ("can't unpack ost_body\n");
415 GOTO (out_req, rc = -EPROTO);
418 lustre_get_wire_obdo(oa, &body->oa);
420 /* This should really be sent by the OST */
421 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
422 oa->o_valid |= OBD_MD_FLBLKSZ;
424 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
425 * have valid lsm_oinfo data structs, so don't go touching that.
426 * This needs to be fixed in a big way.
428 lsm->lsm_object_id = oa->o_id;
432 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
434 if (oa->o_valid & OBD_MD_FLCOOKIE) {
435 if (!oti->oti_logcookies)
436 oti_alloc_cookies(oti, 1);
437 *oti->oti_logcookies = oa->o_lcookie;
441 CDEBUG(D_HA, "transno: "LPD64"\n",
442 lustre_msg_get_transno(req->rq_repmsg));
444 ptlrpc_req_finished(req);
447 obd_free_memmd(exp, &lsm);
451 static int osc_punch_interpret(struct ptlrpc_request *req,
452 struct osc_async_args *aa, int rc)
454 struct ost_body *body;
460 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
461 lustre_swab_ost_body);
463 CERROR ("can't unpack ost_body\n");
464 GOTO(out, rc = -EPROTO);
467 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
469 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
473 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
474 struct obd_trans_info *oti,
475 struct ptlrpc_request_set *rqset)
477 struct ptlrpc_request *req;
478 struct osc_async_args *aa;
479 struct ost_body *body;
480 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
488 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
489 OST_PUNCH, 2, size, NULL);
493 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
494 ptlrpc_at_set_req_timeout(req);
496 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
497 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
499 /* overload the size and blocks fields in the oa with start/end */
500 body->oa.o_size = oinfo->oi_policy.l_extent.start;
501 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
502 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
504 ptlrpc_req_set_repsize(req, 2, size);
506 req->rq_interpret_reply = osc_punch_interpret;
507 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
508 aa = ptlrpc_req_async_args(req);
510 ptlrpc_set_add_req(rqset, req);
515 static int osc_sync_interpret(struct ptlrpc_request *req,
516 struct osc_async_args *aa, int rc)
518 struct ost_body *body;
524 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
525 lustre_swab_ost_body);
527 CERROR ("can't unpack ost_body\n");
528 GOTO(out, rc = -EPROTO);
531 *aa->aa_oi->oi_oa = body->oa;
533 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
537 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
538 obd_size start, obd_size end,
539 struct ptlrpc_request_set *set)
541 struct ptlrpc_request *req;
542 struct ost_body *body;
543 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
544 struct osc_async_args *aa;
552 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
553 OST_SYNC, 2, size, NULL);
557 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
558 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
560 /* overload the size and blocks fields in the oa with start/end */
561 body->oa.o_size = start;
562 body->oa.o_blocks = end;
563 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
565 ptlrpc_req_set_repsize(req, 2, size);
566 req->rq_interpret_reply = osc_sync_interpret;
568 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
569 aa = ptlrpc_req_async_args(req);
572 ptlrpc_set_add_req(set, req);
576 /* Find and cancel locally locks matched by @mode in the resource found by
577 * @objid. Found locks are added into @cancel list. Returns the amount of
578 * locks added to @cancels list. */
579 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
580 struct list_head *cancels, ldlm_mode_t mode,
583 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
584 struct ldlm_res_id res_id;
585 struct ldlm_resource *res;
589 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
590 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
594 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
595 lock_flags, 0, NULL);
596 ldlm_resource_putref(res);
600 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
603 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
605 atomic_dec(&cli->cl_destroy_in_flight);
606 cfs_waitq_signal(&cli->cl_destroy_waitq);
610 static int osc_can_send_destroy(struct client_obd *cli)
612 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
613 cli->cl_max_rpcs_in_flight) {
614 /* The destroy request can be sent */
617 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
618 cli->cl_max_rpcs_in_flight) {
620 * The counter has been modified between the two atomic
623 cfs_waitq_signal(&cli->cl_destroy_waitq);
628 /* Destroy requests can be async always on the client, and we don't even really
629 * care about the return code since the client cannot do anything at all about
631 * When the MDS is unlinking a filename, it saves the file objects into a
632 * recovery llog, and these object records are cancelled when the OST reports
633 * they were destroyed and sync'd to disk (i.e. transaction committed).
634 * If the client dies, or the OST is down when the object should be destroyed,
635 * the records are not cancelled, and when the OST reconnects to the MDS next,
636 * it will retrieve the llog unlink logs and then sends the log cancellation
637 * cookies to the MDS after committing destroy transactions. */
638 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
639 struct lov_stripe_md *ea, struct obd_trans_info *oti,
640 struct obd_export *md_export)
642 CFS_LIST_HEAD(cancels);
643 struct ptlrpc_request *req;
644 struct ost_body *body;
645 __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
646 sizeof(struct ldlm_request) };
647 int count, bufcount = 2;
648 struct client_obd *cli = &exp->exp_obd->u.cli;
656 LASSERT(oa->o_id != 0);
658 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
659 LDLM_FL_DISCARD_DATA);
660 if (exp_connect_cancelset(exp))
662 req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
663 size, REQ_REC_OFF + 1, 0, &cancels, count);
667 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
668 ptlrpc_at_set_req_timeout(req);
670 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
672 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
673 oa->o_lcookie = *oti->oti_logcookies;
676 lustre_set_wire_obdo(&body->oa, oa);
677 ptlrpc_req_set_repsize(req, 2, size);
679 /* don't throttle destroy RPCs for the MDT */
680 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
681 req->rq_interpret_reply = osc_destroy_interpret;
682 if (!osc_can_send_destroy(cli)) {
683 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
687 * Wait until the number of on-going destroy RPCs drops
688 * under max_rpc_in_flight
690 l_wait_event_exclusive(cli->cl_destroy_waitq,
691 osc_can_send_destroy(cli), &lwi);
695 /* Do not wait for response */
696 ptlrpcd_add_req(req);
700 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
703 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
705 LASSERT(!(oa->o_valid & bits));
708 client_obd_list_lock(&cli->cl_loi_list_lock);
709 oa->o_dirty = cli->cl_dirty;
710 if (cli->cl_dirty > cli->cl_dirty_max) {
711 CERROR("dirty %lu > dirty_max %lu\n",
712 cli->cl_dirty, cli->cl_dirty_max);
714 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
715 CERROR("dirty %d > system dirty_max %d\n",
716 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
718 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
719 CERROR("dirty %lu - dirty_max %lu too big???\n",
720 cli->cl_dirty, cli->cl_dirty_max);
723 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
724 (cli->cl_max_rpcs_in_flight + 1);
725 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
727 oa->o_grant = cli->cl_avail_grant;
728 oa->o_dropped = cli->cl_lost_grant;
729 cli->cl_lost_grant = 0;
730 client_obd_list_unlock(&cli->cl_loi_list_lock);
731 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
732 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
736 static void osc_update_next_shrink(struct client_obd *cli)
738 cli->cl_next_shrink_grant =
739 cfs_time_shift(cli->cl_grant_shrink_interval);
740 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
741 cli->cl_next_shrink_grant);
744 /* caller must hold loi_list_lock */
745 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
747 atomic_inc(&obd_dirty_pages);
748 cli->cl_dirty += CFS_PAGE_SIZE;
749 cli->cl_avail_grant -= CFS_PAGE_SIZE;
750 pga->flag |= OBD_BRW_FROM_GRANT;
751 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
752 CFS_PAGE_SIZE, pga, pga->pg);
753 LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
754 cli->cl_avail_grant);
755 osc_update_next_shrink(cli);
758 /* the companion to osc_consume_write_grant, called when a brw has completed.
759 * must be called with the loi lock held. */
760 static void osc_release_write_grant(struct client_obd *cli,
761 struct brw_page *pga, int sent)
763 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
766 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
771 pga->flag &= ~OBD_BRW_FROM_GRANT;
772 atomic_dec(&obd_dirty_pages);
773 cli->cl_dirty -= CFS_PAGE_SIZE;
775 cli->cl_lost_grant += CFS_PAGE_SIZE;
776 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
777 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
778 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
779 /* For short writes we shouldn't count parts of pages that
780 * span a whole block on the OST side, or our accounting goes
781 * wrong. Should match the code in filter_grant_check. */
782 int offset = pga->off & ~CFS_PAGE_MASK;
783 int count = pga->count + (offset & (blocksize - 1));
784 int end = (offset + pga->count) & (blocksize - 1);
786 count += blocksize - end;
788 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
789 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
790 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
791 cli->cl_avail_grant, cli->cl_dirty);
797 static unsigned long rpcs_in_flight(struct client_obd *cli)
799 return cli->cl_r_in_flight + cli->cl_w_in_flight;
802 /* caller must hold loi_list_lock */
803 void osc_wake_cache_waiters(struct client_obd *cli)
805 struct list_head *l, *tmp;
806 struct osc_cache_waiter *ocw;
809 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
810 /* if we can't dirty more, we must wait until some is written */
811 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
812 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
813 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
814 "osc max %ld, sys max %d\n", cli->cl_dirty,
815 cli->cl_dirty_max, obd_max_dirty_pages);
819 /* if still dirty cache but no grant wait for pending RPCs that
820 * may yet return us some grant before doing sync writes */
821 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
822 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
823 cli->cl_w_in_flight);
827 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
828 list_del_init(&ocw->ocw_entry);
829 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
830 /* no more RPCs in flight to return grant, do sync IO */
831 ocw->ocw_rc = -EDQUOT;
832 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
834 osc_consume_write_grant(cli,
835 &ocw->ocw_oap->oap_brw_page);
838 cfs_waitq_signal(&ocw->ocw_waitq);
844 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
846 client_obd_list_lock(&cli->cl_loi_list_lock);
847 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
848 if (body->oa.o_valid & OBD_MD_FLGRANT)
849 cli->cl_avail_grant += body->oa.o_grant;
850 /* waiters are woken in brw_interpret */
851 client_obd_list_unlock(&cli->cl_loi_list_lock);
854 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
855 void *key, obd_count vallen, void *val,
856 struct ptlrpc_request_set *set);
858 static int osc_shrink_grant_interpret(struct ptlrpc_request *req,
859 struct osc_grant_args *aa, int rc)
861 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
862 struct obdo *oa = aa->aa_oa;
863 struct ost_body *body;
866 client_obd_list_lock(&cli->cl_loi_list_lock);
867 cli->cl_avail_grant += oa->o_grant;
868 client_obd_list_unlock(&cli->cl_loi_list_lock);
871 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oa),
872 lustre_swab_ost_body);
873 osc_update_grant(cli, body);
879 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
881 client_obd_list_lock(&cli->cl_loi_list_lock);
882 oa->o_grant = cli->cl_avail_grant / 4;
883 cli->cl_avail_grant -= oa->o_grant;
884 client_obd_list_unlock(&cli->cl_loi_list_lock);
885 oa->o_flags |= OBD_FL_SHRINK_GRANT;
886 osc_update_next_shrink(cli);
889 /* Shrink the current grant, either from some large amount to enough for a
890 * full set of in-flight RPCs, or if we have already shrunk to that limit
891 * then to enough for a single RPC. This avoids keeping more grant than
892 * needed, and avoids shrinking the grant piecemeal. */
893 static int osc_shrink_grant(struct client_obd *cli)
895 long target = (cli->cl_max_rpcs_in_flight + 1) *
896 cli->cl_max_pages_per_rpc;
898 client_obd_list_lock(&cli->cl_loi_list_lock);
899 if (cli->cl_avail_grant <= target)
900 target = cli->cl_max_pages_per_rpc;
901 client_obd_list_unlock(&cli->cl_loi_list_lock);
903 return osc_shrink_grant_to_target(cli, target);
906 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
909 struct ost_body *body;
912 client_obd_list_lock(&cli->cl_loi_list_lock);
913 /* Don't shrink if we are already above or below the desired limit
914 * We don't want to shrink below a single RPC, as that will negatively
915 * impact block allocation and long-term performance. */
916 if (target < cli->cl_max_pages_per_rpc)
917 target = cli->cl_max_pages_per_rpc;
919 if (target >= cli->cl_avail_grant) {
920 client_obd_list_unlock(&cli->cl_loi_list_lock);
923 client_obd_list_unlock(&cli->cl_loi_list_lock);
929 osc_announce_cached(cli, &body->oa, 0);
931 client_obd_list_lock(&cli->cl_loi_list_lock);
932 body->oa.o_grant = cli->cl_avail_grant - target;
933 cli->cl_avail_grant = target;
934 client_obd_list_unlock(&cli->cl_loi_list_lock);
935 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
936 osc_update_next_shrink(cli);
938 rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
939 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
940 sizeof(*body), body, NULL);
942 client_obd_list_lock(&cli->cl_loi_list_lock);
943 cli->cl_avail_grant += body->oa.o_grant;
944 client_obd_list_unlock(&cli->cl_loi_list_lock);
950 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
951 static int osc_should_shrink_grant(struct client_obd *client)
953 cfs_time_t time = cfs_time_current();
954 cfs_time_t next_shrink = client->cl_next_shrink_grant;
955 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
956 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
957 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
960 osc_update_next_shrink(client);
965 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
967 struct client_obd *client;
969 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
970 if (osc_should_shrink_grant(client))
971 osc_shrink_grant(client);
976 static int osc_add_shrink_grant(struct client_obd *client)
980 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
982 osc_grant_shrink_grant_cb, NULL,
983 &client->cl_grant_shrink_list);
985 CERROR("add grant client %s error %d\n",
986 client->cl_import->imp_obd->obd_name, rc);
989 CDEBUG(D_CACHE, "add grant client %s \n",
990 client->cl_import->imp_obd->obd_name);
991 osc_update_next_shrink(client);
995 static int osc_del_shrink_grant(struct client_obd *client)
997 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1001 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1003 client_obd_list_lock(&cli->cl_loi_list_lock);
1004 cli->cl_avail_grant = ocd->ocd_grant;
1005 client_obd_list_unlock(&cli->cl_loi_list_lock);
1007 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1008 list_empty(&cli->cl_grant_shrink_list))
1009 osc_add_shrink_grant(cli);
1011 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1012 cli->cl_avail_grant, cli->cl_lost_grant);
1013 LASSERT(cli->cl_avail_grant >= 0);
1016 /* We assume that the reason this OSC got a short read is because it read
1017 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1018 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1019 * this stripe never got written at or beyond this stripe offset yet. */
1020 static void handle_short_read(int nob_read, obd_count page_count,
1021 struct brw_page **pga)
1026 /* skip bytes read OK */
1027 while (nob_read > 0) {
1028 LASSERT (page_count > 0);
1030 if (pga[i]->count > nob_read) {
1031 /* EOF inside this page */
1032 ptr = cfs_kmap(pga[i]->pg) +
1033 (pga[i]->off & ~CFS_PAGE_MASK);
1034 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1035 cfs_kunmap(pga[i]->pg);
1041 nob_read -= pga[i]->count;
1046 /* zero remaining pages */
1047 while (page_count-- > 0) {
1048 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1049 memset(ptr, 0, pga[i]->count);
1050 cfs_kunmap(pga[i]->pg);
1055 static int check_write_rcs(struct ptlrpc_request *req,
1056 int requested_nob, int niocount,
1057 obd_count page_count, struct brw_page **pga)
1061 /* return error if any niobuf was in error */
1062 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1063 sizeof(*remote_rcs) * niocount, NULL);
1064 if (remote_rcs == NULL) {
1065 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
1068 if (lustre_rep_need_swab(req))
1069 for (i = 0; i < niocount; i++)
1070 __swab32s(&remote_rcs[i]);
1072 for (i = 0; i < niocount; i++) {
1073 if (remote_rcs[i] < 0)
1074 return(remote_rcs[i]);
1076 if (remote_rcs[i] != 0) {
1077 CERROR("rc[%d] invalid (%d) req %p\n",
1078 i, remote_rcs[i], req);
1083 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1084 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1085 req->rq_bulk->bd_nob_transferred, requested_nob);
1092 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1094 if (p1->flag != p2->flag) {
1095 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
1097 /* warn if we try to combine flags that we don't know to be
1098 * safe to combine */
1099 if ((p1->flag & mask) != (p2->flag & mask))
1100 CERROR("is it ok to have flags 0x%x and 0x%x in the "
1101 "same brw?\n", p1->flag, p2->flag);
1105 return (p1->off + p1->count == p2->off);
1108 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1109 struct brw_page **pga, int opc,
1110 cksum_type_t cksum_type, int pshift)
1115 LASSERT (pg_count > 0);
1116 cksum = init_checksum(cksum_type);
1117 while (nob > 0 && pg_count > 0) {
1118 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1119 int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK;
1120 int count = pga[i]->count > nob ? nob : pga[i]->count;
1122 /* corrupt the data before we compute the checksum, to
1123 * simulate an OST->client data error */
1124 if (i == 0 && opc == OST_READ &&
1125 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1126 memcpy(ptr + off, "bad1", min(4, nob));
1127 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1128 cfs_kunmap(pga[i]->pg);
1129 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1132 nob -= pga[i]->count;
1136 /* For sending we only compute the wrong checksum instead
1137 * of corrupting the data so it is still correct on a redo */
1138 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
1144 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1145 struct lov_stripe_md *lsm, obd_count page_count,
1146 struct brw_page **pga,
1147 struct ptlrpc_request **reqp, int pshift)
1149 struct ptlrpc_request *req;
1150 struct ptlrpc_bulk_desc *desc;
1151 struct ost_body *body;
1152 struct obd_ioobj *ioobj;
1153 struct niobuf_remote *niobuf;
1154 __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1155 int niocount, i, requested_nob, opc, rc;
1156 struct ptlrpc_request_pool *pool;
1157 struct osc_brw_async_args *aa;
1158 struct brw_page *pg_prev;
1161 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1162 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1164 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1165 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1167 for (niocount = i = 1; i < page_count; i++) {
1168 if (!can_merge_pages(pga[i - 1], pga[i]))
1172 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1173 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1175 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1180 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1181 ptlrpc_at_set_req_timeout(req);
1183 if (opc == OST_WRITE)
1184 desc = ptlrpc_prep_bulk_imp (req, page_count,
1185 BULK_GET_SOURCE, OST_BULK_PORTAL);
1187 desc = ptlrpc_prep_bulk_imp (req, page_count,
1188 BULK_PUT_SINK, OST_BULK_PORTAL);
1190 GOTO(out, rc = -ENOMEM);
1191 /* NB request now owns desc and will free it when it gets freed */
1193 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1194 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1195 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1196 niocount * sizeof(*niobuf));
1198 lustre_set_wire_obdo(&body->oa, oa);
1199 obdo_to_ioobj(oa, ioobj);
1200 ioobj->ioo_bufcnt = niocount;
1202 LASSERT (page_count > 0);
1204 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1205 struct brw_page *pg = pga[i];
1207 LASSERT(pg->count > 0);
1208 LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) +
1209 pg->count <= CFS_PAGE_SIZE,
1210 "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n",
1211 i, pg, pg->off, pg->count, pshift);
1213 LASSERTF(i == 0 || pg->off > pg_prev->off,
1214 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1215 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1217 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1218 pg_prev->pg, page_private(pg_prev->pg),
1219 pg_prev->pg->index, pg_prev->off);
1221 LASSERTF(i == 0 || pg->off > pg_prev->off,
1222 "i %d p_c %u\n", i, page_count);
1224 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1225 (pg->flag & OBD_BRW_SRVLOCK));
1227 ptlrpc_prep_bulk_page(desc, pg->pg,
1228 OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK,
1230 requested_nob += pg->count;
1232 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1234 niobuf->len += pg->count;
1236 niobuf->offset = pg->off;
1237 niobuf->len = pg->count;
1238 niobuf->flags = pg->flag;
1243 LASSERTF((void *)(niobuf - niocount) ==
1244 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1245 niocount * sizeof(*niobuf)),
1246 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1247 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1248 (void *)(niobuf - niocount));
1250 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1251 if (osc_should_shrink_grant(cli))
1252 osc_shrink_grant_local(cli, &body->oa);
1254 /* size[REQ_REC_OFF] still sizeof (*body) */
1255 if (opc == OST_WRITE) {
1256 if (cli->cl_checksum) {
1257 /* store cl_cksum_type in a local variable since
1258 * it can be changed via lprocfs */
1259 cksum_type_t cksum_type = cli->cl_cksum_type;
1261 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1262 oa->o_flags &= OBD_FL_LOCAL_MASK;
1263 body->oa.o_flags = 0;
1265 body->oa.o_flags |= cksum_type_pack(cksum_type);
1266 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1267 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1270 cksum_type, pshift);
1271 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1273 /* save this in 'oa', too, for later checking */
1274 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1275 oa->o_flags |= cksum_type_pack(cksum_type);
1277 /* clear out the checksum flag, in case this is a
1278 * resend but cl_checksum is no longer set. b=11238 */
1279 oa->o_valid &= ~OBD_MD_FLCKSUM;
1281 oa->o_cksum = body->oa.o_cksum;
1282 /* 1 RC per niobuf */
1283 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1284 ptlrpc_req_set_repsize(req, 3, size);
1286 if (cli->cl_checksum) {
1287 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1288 body->oa.o_flags = 0;
1289 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1290 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1292 /* 1 RC for the whole I/O */
1293 ptlrpc_req_set_repsize(req, 2, size);
1296 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1297 aa = ptlrpc_req_async_args(req);
1299 aa->aa_requested_nob = requested_nob;
1300 aa->aa_nio_count = niocount;
1301 aa->aa_page_count = page_count;
1305 aa->aa_pshift = pshift;
1306 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1312 ptlrpc_req_finished (req);
1316 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1317 __u32 client_cksum, __u32 server_cksum, int nob,
1318 obd_count page_count, struct brw_page **pga,
1319 cksum_type_t client_cksum_type, int pshift)
1323 cksum_type_t cksum_type;
1325 if (server_cksum == client_cksum) {
1326 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1330 if (oa->o_valid & OBD_MD_FLFLAGS)
1331 cksum_type = cksum_type_unpack(oa->o_flags);
1333 cksum_type = OBD_CKSUM_CRC32;
1335 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1336 cksum_type, pshift);
1338 if (cksum_type != client_cksum_type)
1339 msg = "the server did not use the checksum type specified in "
1340 "the original request - likely a protocol problem";
1341 else if (new_cksum == server_cksum)
1342 msg = "changed on the client after we checksummed it - "
1343 "likely false positive due to mmap IO (bug 11742)";
1344 else if (new_cksum == client_cksum)
1345 msg = "changed in transit before arrival at OST";
1347 msg = "changed in transit AND doesn't match the original - "
1348 "likely false positive due to mmap IO (bug 11742)";
1350 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1351 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1352 "["LPU64"-"LPU64"]\n",
1353 msg, libcfs_nid2str(peer->nid),
1354 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1355 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1358 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1360 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1361 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1362 "client csum now %x\n", client_cksum, client_cksum_type,
1363 server_cksum, cksum_type, new_cksum);
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1371 struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1372 const lnet_process_id_t *peer =
1373 &req->rq_import->imp_connection->c_peer;
1374 struct client_obd *cli = aa->aa_cli;
1375 struct ost_body *body;
1376 __u32 client_cksum = 0;
1379 if (rc < 0 && rc != -EDQUOT)
1382 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1383 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1384 lustre_swab_ost_body);
1386 CERROR ("Can't unpack body\n");
1390 /* set/clear over quota flag for a uid/gid */
1391 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1392 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1393 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1394 body->oa.o_gid, body->oa.o_valid,
1400 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1401 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1403 osc_update_grant(cli, body);
1405 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1407 CERROR ("Unexpected +ve rc %d\n", rc);
1410 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1412 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1413 check_write_checksum(&body->oa, peer, client_cksum,
1414 body->oa.o_cksum, aa->aa_requested_nob,
1415 aa->aa_page_count, aa->aa_ppga,
1416 cksum_type_unpack(aa->aa_oa->o_flags),
1420 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1421 aa->aa_page_count, aa->aa_ppga);
1425 /* The rest of this function executes only for OST_READs */
1426 if (rc > aa->aa_requested_nob) {
1427 CERROR("Unexpected rc %d (%d requested)\n", rc,
1428 aa->aa_requested_nob);
1432 if (rc != req->rq_bulk->bd_nob_transferred) {
1433 CERROR ("Unexpected rc %d (%d transferred)\n",
1434 rc, req->rq_bulk->bd_nob_transferred);
1438 if (rc < aa->aa_requested_nob)
1439 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1441 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1442 static int cksum_counter;
1443 __u32 server_cksum = body->oa.o_cksum;
1446 cksum_type_t cksum_type;
1448 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1449 cksum_type = cksum_type_unpack(body->oa.o_flags);
1451 cksum_type = OBD_CKSUM_CRC32;
1452 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1453 aa->aa_ppga, OST_READ,
1454 cksum_type, aa->aa_pshift);
1456 if (peer->nid == req->rq_bulk->bd_sender) {
1460 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1463 if (server_cksum == ~0 && rc > 0) {
1464 CERROR("Protocol error: server %s set the 'checksum' "
1465 "bit, but didn't send a checksum. Not fatal, "
1466 "but please notify on http://bugzilla.lustre.org/\n",
1467 libcfs_nid2str(peer->nid));
1468 } else if (server_cksum != client_cksum) {
1469 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1470 "%s%s%s inum "LPU64"/"LPU64" object "
1471 LPU64"/"LPU64" extent "
1472 "["LPU64"-"LPU64"]\n",
1473 req->rq_import->imp_obd->obd_name,
1474 libcfs_nid2str(peer->nid),
1476 body->oa.o_valid & OBD_MD_FLFID ?
1477 body->oa.o_fid : (__u64)0,
1478 body->oa.o_valid & OBD_MD_FLFID ?
1479 body->oa.o_generation :(__u64)0,
1481 body->oa.o_valid & OBD_MD_FLGROUP ?
1482 body->oa.o_gr : (__u64)0,
1483 aa->aa_ppga[0]->off,
1484 aa->aa_ppga[aa->aa_page_count-1]->off +
1485 aa->aa_ppga[aa->aa_page_count-1]->count -
1487 CERROR("client %x, server %x, cksum_type %x\n",
1488 client_cksum, server_cksum, cksum_type);
1490 aa->aa_oa->o_cksum = client_cksum;
1494 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1497 } else if (unlikely(client_cksum)) {
1498 static int cksum_missed;
1501 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1502 CERROR("Checksum %u requested from %s but not sent\n",
1503 cksum_missed, libcfs_nid2str(peer->nid));
1509 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1514 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1515 struct lov_stripe_md *lsm,
1516 obd_count page_count, struct brw_page **pga)
1518 struct ptlrpc_request *request;
1522 struct l_wait_info lwi;
1525 init_waitqueue_head(&waitq);
1528 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1529 page_count, pga, &request, 0);
1533 rc = ptlrpc_queue_wait(request);
1535 if (rc == -ETIMEDOUT && request->rq_resend) {
1536 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1537 ptlrpc_req_finished(request);
1541 rc = osc_brw_fini_request(request, rc);
1543 ptlrpc_req_finished(request);
1544 if (osc_recoverable_error(rc)) {
1546 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1547 CERROR("too many resend retries, returning error\n");
1551 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1552 l_wait_event(waitq, 0, &lwi);
1559 int osc_brw_redo_request(struct ptlrpc_request *request,
1560 struct osc_brw_async_args *aa)
1562 struct ptlrpc_request *new_req;
1563 struct ptlrpc_request_set *set = request->rq_set;
1564 struct osc_brw_async_args *new_aa;
1565 struct osc_async_page *oap;
1569 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1570 CERROR("too many resend retries, returning error\n");
1574 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1576 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1577 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1578 aa->aa_cli, aa->aa_oa,
1579 NULL /* lsm unused by osc currently */,
1580 aa->aa_page_count, aa->aa_ppga, &new_req,
1585 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1587 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1588 if (oap->oap_request != NULL) {
1589 LASSERTF(request == oap->oap_request,
1590 "request %p != oap_request %p\n",
1591 request, oap->oap_request);
1592 if (oap->oap_interrupted) {
1593 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1594 ptlrpc_req_finished(new_req);
1599 /* New request takes over pga and oaps from old request.
1600 * Note that copying a list_head doesn't work, need to move it... */
1602 new_req->rq_interpret_reply = request->rq_interpret_reply;
1603 new_req->rq_async_args = request->rq_async_args;
1604 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1606 new_aa = ptlrpc_req_async_args(new_req);
1608 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1609 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1610 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1612 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1613 if (oap->oap_request) {
1614 ptlrpc_req_finished(oap->oap_request);
1615 oap->oap_request = ptlrpc_request_addref(new_req);
1619 /* use ptlrpc_set_add_req is safe because interpret functions work
1620 * in check_set context. only one way exist with access to request
1621 * from different thread got -EINTR - this way protected with
1622 * cl_loi_list_lock */
1623 ptlrpc_set_add_req(set, new_req);
1625 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1627 DEBUG_REQ(D_INFO, new_req, "new request");
1631 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1632 struct lov_stripe_md *lsm, obd_count page_count,
1633 struct brw_page **pga, struct ptlrpc_request_set *set,
1636 struct ptlrpc_request *request;
1637 struct client_obd *cli = &exp->exp_obd->u.cli;
1639 struct osc_brw_async_args *aa;
1642 /* Consume write credits even if doing a sync write -
1643 * otherwise we may run out of space on OST due to grant. */
1644 /* FIXME: unaligned writes must use write grants too */
1645 if (cmd == OBD_BRW_WRITE && pshift == 0) {
1646 client_obd_list_lock(&cli->cl_loi_list_lock);
1647 for (i = 0; i < page_count; i++) {
1648 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1649 osc_consume_write_grant(cli, pga[i]);
1651 client_obd_list_unlock(&cli->cl_loi_list_lock);
1654 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1655 page_count, pga, &request, pshift);
1657 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1660 aa = ptlrpc_req_async_args(request);
1661 if (cmd == OBD_BRW_READ) {
1662 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1663 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1665 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1666 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1667 cli->cl_w_in_flight);
1669 ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1671 LASSERT(list_empty(&aa->aa_oaps));
1673 request->rq_interpret_reply = brw_interpret;
1674 ptlrpc_set_add_req(set, request);
1675 client_obd_list_lock(&cli->cl_loi_list_lock);
1676 if (cmd == OBD_BRW_READ)
1677 cli->cl_r_in_flight++;
1679 cli->cl_w_in_flight++;
1680 client_obd_list_unlock(&cli->cl_loi_list_lock);
1681 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1682 } else if (cmd == OBD_BRW_WRITE) {
1683 client_obd_list_lock(&cli->cl_loi_list_lock);
1684 for (i = 0; i < page_count; i++)
1685 osc_release_write_grant(cli, pga[i], 0);
1686 osc_wake_cache_waiters(cli);
1687 client_obd_list_unlock(&cli->cl_loi_list_lock);
1694 * ugh, we want disk allocation on the target to happen in offset order. we'll
1695 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1696 * fine for our small page arrays and doesn't require allocation. its an
1697 * insertion sort that swaps elements that are strides apart, shrinking the
1698 * stride down until its '1' and the array is sorted.
1700 static void sort_brw_pages(struct brw_page **array, int num)
1703 struct brw_page *tmp;
1707 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1712 for (i = stride ; i < num ; i++) {
1715 while (j >= stride && array[j-stride]->off > tmp->off) {
1716 array[j] = array[j - stride];
1721 } while (stride > 1);
1724 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages,
1731 LASSERT (pages > 0);
1732 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1736 if (pages == 0) /* that's all */
1739 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1740 return count; /* doesn't end on page boundary */
1743 offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK;
1744 if (offset != 0) /* doesn't start on page boundary */
1751 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1753 struct brw_page **ppga;
1756 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1760 for (i = 0; i < count; i++)
1765 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1767 LASSERT(ppga != NULL);
1768 OBD_FREE(ppga, sizeof(*ppga) * count);
1771 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1772 obd_count page_count, struct brw_page *pga,
1773 struct obd_trans_info *oti)
1775 struct obdo *saved_oa = NULL;
1776 struct brw_page **ppga, **orig;
1777 struct obd_import *imp = class_exp2cliimp(exp);
1778 struct client_obd *cli;
1779 int rc, page_count_orig;
1782 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1783 cli = &imp->imp_obd->u.cli;
1785 if (cmd & OBD_BRW_CHECK) {
1786 /* The caller just wants to know if there's a chance that this
1787 * I/O can succeed */
1789 if (imp->imp_invalid)
1794 /* test_brw with a failed create can trip this, maybe others. */
1795 LASSERT(cli->cl_max_pages_per_rpc);
1799 orig = ppga = osc_build_ppga(pga, page_count);
1802 page_count_orig = page_count;
1804 sort_brw_pages(ppga, page_count);
1805 while (page_count) {
1806 obd_count pages_per_brw;
1808 if (page_count > cli->cl_max_pages_per_rpc)
1809 pages_per_brw = cli->cl_max_pages_per_rpc;
1811 pages_per_brw = page_count;
1813 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0);
1815 if (saved_oa != NULL) {
1816 /* restore previously saved oa */
1817 *oinfo->oi_oa = *saved_oa;
1818 } else if (page_count > pages_per_brw) {
1819 /* save a copy of oa (brw will clobber it) */
1820 OBDO_ALLOC(saved_oa);
1821 if (saved_oa == NULL)
1822 GOTO(out, rc = -ENOMEM);
1823 *saved_oa = *oinfo->oi_oa;
1826 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1827 pages_per_brw, ppga);
1832 page_count -= pages_per_brw;
1833 ppga += pages_per_brw;
1837 osc_release_ppga(orig, page_count_orig);
1839 if (saved_oa != NULL)
1840 OBDO_FREE(saved_oa);
1845 static int osc_brw_async(int cmd, struct obd_export *exp,
1846 struct obd_info *oinfo, obd_count page_count,
1847 struct brw_page *pga, struct obd_trans_info *oti,
1848 struct ptlrpc_request_set *set, int pshift)
1850 struct brw_page **ppga, **orig;
1851 int page_count_orig;
1855 if (cmd & OBD_BRW_CHECK) {
1856 /* The caller just wants to know if there's a chance that this
1857 * I/O can succeed */
1858 struct obd_import *imp = class_exp2cliimp(exp);
1860 if (imp == NULL || imp->imp_invalid)
1865 orig = ppga = osc_build_ppga(pga, page_count);
1868 page_count_orig = page_count;
1870 sort_brw_pages(ppga, page_count);
1871 while (page_count) {
1872 struct brw_page **copy;
1874 obd_count pages_per_brw;
1876 /* one page less under unaligned direct i/o */
1877 pages_per_brw = min_t(obd_count, page_count,
1878 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
1881 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
1884 /* use ppga only if single RPC is going to fly */
1885 if (pages_per_brw != page_count_orig || ppga != orig) {
1886 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1888 GOTO(out, rc = -ENOMEM);
1889 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1893 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1894 GOTO(out, rc = -ENOMEM);
1896 memcpy(oa, oinfo->oi_oa, sizeof(*oa));
1897 oa->o_flags |= OBD_FL_TEMPORARY;
1901 LASSERT(!(oa->o_flags & OBD_FL_TEMPORARY));
1904 rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw,
1909 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1911 if (oa->o_flags & OBD_FL_TEMPORARY)
1917 /* we passed it to async_internal() which is
1918 * now responsible for releasing memory */
1922 page_count -= pages_per_brw;
1923 ppga += pages_per_brw;
1927 osc_release_ppga(orig, page_count_orig);
1931 static void osc_check_rpcs(struct client_obd *cli);
1933 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1934 * the dirty accounting. Writeback completes or truncate happens before
1935 * writing starts. Must be called with the loi lock held. */
1936 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1939 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1942 /* This maintains the lists of pending pages to read/write for a given object
1943 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1944 * to quickly find objects that are ready to send an RPC. */
1945 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1951 if (lop->lop_num_pending == 0)
1954 /* if we have an invalid import we want to drain the queued pages
1955 * by forcing them through rpcs that immediately fail and complete
1956 * the pages. recovery relies on this to empty the queued pages
1957 * before canceling the locks and evicting down the llite pages */
1958 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1961 /* stream rpcs in queue order as long as as there is an urgent page
1962 * queued. this is our cheap solution for good batching in the case
1963 * where writepage marks some random page in the middle of the file
1964 * as urgent because of, say, memory pressure */
1965 if (!list_empty(&lop->lop_urgent)) {
1966 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1970 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1971 optimal = cli->cl_max_pages_per_rpc;
1972 if (cmd & OBD_BRW_WRITE) {
1973 /* trigger a write rpc stream as long as there are dirtiers
1974 * waiting for space. as they're waiting, they're not going to
1975 * create more pages to coallesce with what's waiting.. */
1976 if (!list_empty(&cli->cl_cache_waiters)) {
1977 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1981 /* +16 to avoid triggering rpcs that would want to include pages
1982 * that are being queued but which can't be made ready until
1983 * the queuer finishes with the page. this is a wart for
1984 * llite::commit_write() */
1987 if (lop->lop_num_pending >= optimal)
1993 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1995 struct osc_async_page *oap;
1998 if (list_empty(&lop->lop_urgent))
2001 oap = list_entry(lop->lop_urgent.next,
2002 struct osc_async_page, oap_urgent_item);
2004 if (oap->oap_async_flags & ASYNC_HP) {
2005 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2012 static void on_list(struct list_head *item, struct list_head *list,
2015 if (list_empty(item) && should_be_on)
2016 list_add_tail(item, list);
2017 else if (!list_empty(item) && !should_be_on)
2018 list_del_init(item);
2021 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2022 * can find pages to build into rpcs quickly */
2023 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2025 if (lop_makes_hprpc(&loi->loi_write_lop) ||
2026 lop_makes_hprpc(&loi->loi_read_lop)) {
2028 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2029 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2031 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2032 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2033 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2034 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2037 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2038 loi->loi_write_lop.lop_num_pending);
2040 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2041 loi->loi_read_lop.lop_num_pending);
2044 static void lop_update_pending(struct client_obd *cli,
2045 struct loi_oap_pages *lop, int cmd, int delta)
2047 lop->lop_num_pending += delta;
2048 if (cmd & OBD_BRW_WRITE)
2049 cli->cl_pending_w_pages += delta;
2051 cli->cl_pending_r_pages += delta;
2054 /* this is called when a sync waiter receives an interruption. Its job is to
2055 * get the caller woken as soon as possible. If its page hasn't been put in an
2056 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2057 * desiring interruption which will forcefully complete the rpc once the rpc
2059 static void osc_occ_interrupted(struct oig_callback_context *occ)
2061 struct osc_async_page *oap;
2062 struct loi_oap_pages *lop;
2063 struct lov_oinfo *loi;
2066 /* XXX member_of() */
2067 oap = list_entry(occ, struct osc_async_page, oap_occ);
2069 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
2071 oap->oap_interrupted = 1;
2073 /* ok, it's been put in an rpc. only one oap gets a request reference */
2074 if (oap->oap_request != NULL) {
2075 ptlrpc_mark_interrupted(oap->oap_request);
2076 ptlrpcd_wake(oap->oap_request);
2080 /* we don't get interruption callbacks until osc_trigger_group_io()
2081 * has been called and put the sync oaps in the pending/urgent lists.*/
2082 if (!list_empty(&oap->oap_pending_item)) {
2083 list_del_init(&oap->oap_pending_item);
2084 list_del_init(&oap->oap_urgent_item);
2087 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2088 &loi->loi_write_lop : &loi->loi_read_lop;
2089 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2090 loi_list_maint(oap->oap_cli, oap->oap_loi);
2092 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
2093 oap->oap_oig = NULL;
2097 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
2100 /* this is trying to propogate async writeback errors back up to the
2101 * application. As an async write fails we record the error code for later if
2102 * the app does an fsync. As long as errors persist we force future rpcs to be
2103 * sync so that the app can get a sync error and break the cycle of queueing
2104 * pages for which writeback will fail. */
2105 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2112 ar->ar_force_sync = 1;
2113 ar->ar_min_xid = ptlrpc_sample_next_xid();
2118 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2119 ar->ar_force_sync = 0;
2122 static void osc_oap_to_pending(struct osc_async_page *oap)
2124 struct loi_oap_pages *lop;
2126 if (oap->oap_cmd & OBD_BRW_WRITE)
2127 lop = &oap->oap_loi->loi_write_lop;
2129 lop = &oap->oap_loi->loi_read_lop;
2131 if (oap->oap_async_flags & ASYNC_HP)
2132 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2133 else if (oap->oap_async_flags & ASYNC_URGENT)
2134 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2135 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2136 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2139 /* this must be called holding the loi list lock to give coverage to exit_cache,
2140 * async_flag maintenance, and oap_request */
2141 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
2142 struct osc_async_page *oap, int sent, int rc)
2147 if (oap->oap_request != NULL) {
2148 xid = ptlrpc_req_xid(oap->oap_request);
2149 ptlrpc_req_finished(oap->oap_request);
2150 oap->oap_request = NULL;
2153 spin_lock(&oap->oap_lock);
2154 oap->oap_async_flags = 0;
2155 spin_unlock(&oap->oap_lock);
2156 oap->oap_interrupted = 0;
2158 if (oap->oap_cmd & OBD_BRW_WRITE) {
2159 osc_process_ar(&cli->cl_ar, xid, rc);
2160 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2163 if (rc == 0 && oa != NULL) {
2164 if (oa->o_valid & OBD_MD_FLBLOCKS)
2165 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2166 if (oa->o_valid & OBD_MD_FLMTIME)
2167 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2168 if (oa->o_valid & OBD_MD_FLATIME)
2169 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2170 if (oa->o_valid & OBD_MD_FLCTIME)
2171 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2175 osc_exit_cache(cli, oap, sent);
2176 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2177 oap->oap_oig = NULL;
2182 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2183 oap->oap_cmd, oa, rc);
2185 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2186 * I/O on the page could start, but OSC calls it under lock
2187 * and thus we can add oap back to pending safely */
2189 /* upper layer wants to leave the page on pending queue */
2190 osc_oap_to_pending(oap);
2192 osc_exit_cache(cli, oap, sent);
2196 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2198 struct osc_brw_async_args *aa = data;
2199 struct client_obd *cli;
2202 rc = osc_brw_fini_request(request, rc);
2203 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2205 if (osc_recoverable_error(rc)) {
2206 rc = osc_brw_redo_request(request, aa);
2212 client_obd_list_lock(&cli->cl_loi_list_lock);
2213 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2214 * is called so we know whether to go to sync BRWs or wait for more
2215 * RPCs to complete */
2216 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2217 cli->cl_w_in_flight--;
2219 cli->cl_r_in_flight--;
2221 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2222 struct osc_async_page *oap, *tmp;
2223 /* the caller may re-use the oap after the completion call so
2224 * we need to clean it up a little */
2225 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2226 list_del_init(&oap->oap_rpc_item);
2227 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2229 OBDO_FREE(aa->aa_oa);
2230 } else { /* from async_internal() */
2232 for (i = 0; i < aa->aa_page_count; i++)
2233 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2235 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2236 OBDO_FREE(aa->aa_oa);
2238 osc_wake_cache_waiters(cli);
2239 osc_check_rpcs(cli);
2240 client_obd_list_unlock(&cli->cl_loi_list_lock);
2242 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2247 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2248 struct list_head *rpc_list,
2249 int page_count, int cmd)
2251 struct ptlrpc_request *req;
2252 struct brw_page **pga = NULL;
2253 struct osc_brw_async_args *aa;
2254 struct obdo *oa = NULL;
2255 struct obd_async_page_ops *ops = NULL;
2256 void *caller_data = NULL;
2257 struct osc_async_page *oap;
2258 struct ldlm_lock *lock = NULL;
2263 LASSERT(!list_empty(rpc_list));
2265 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2267 RETURN(ERR_PTR(-ENOMEM));
2271 GOTO(out, req = ERR_PTR(-ENOMEM));
2274 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2276 ops = oap->oap_caller_ops;
2277 caller_data = oap->oap_caller_data;
2278 lock = oap->oap_ldlm_lock;
2280 pga[i] = &oap->oap_brw_page;
2281 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2282 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2283 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2287 /* always get the data for the obdo for the rpc */
2288 LASSERT(ops != NULL);
2289 ops->ap_fill_obdo(caller_data, cmd, oa);
2291 oa->o_handle = lock->l_remote_handle;
2292 oa->o_valid |= OBD_MD_FLHANDLE;
2295 sort_brw_pages(pga, page_count);
2296 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0);
2298 CERROR("prep_req failed: %d\n", rc);
2299 GOTO(out, req = ERR_PTR(rc));
2301 oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2302 sizeof(struct ost_body)))->oa;
2304 /* Need to update the timestamps after the request is built in case
2305 * we race with setattr (locally or in queue at OST). If OST gets
2306 * later setattr before earlier BRW (as determined by the request xid),
2307 * the OST will not use BRW timestamps. Sadly, there is no obvious
2308 * way to do this in a single call. bug 10150 */
2309 if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2310 /* in case of lockless read/write do not use inode's
2311 * timestamps because concurrent stat might fill the
2312 * inode with out-of-date times, send current
2314 if (cmd & OBD_BRW_WRITE) {
2315 oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2316 oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2317 valid = OBD_MD_FLATIME;
2319 oa->o_atime = LTIME_S(CURRENT_TIME);
2320 oa->o_valid |= OBD_MD_FLATIME;
2321 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2324 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2326 ops->ap_update_obdo(caller_data, cmd, oa, valid);
2328 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2329 aa = ptlrpc_req_async_args(req);
2330 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2331 list_splice(rpc_list, &aa->aa_oaps);
2332 CFS_INIT_LIST_HEAD(rpc_list);
2339 OBD_FREE(pga, sizeof(*pga) * page_count);
2344 /* the loi lock is held across this function but it's allowed to release
2345 * and reacquire it during its work */
2347 * prepare pages for ASYNC io and put pages in send queue.
2351 * \param cmd - OBD_BRW_* macroses
2352 * \param lop - pending pages
2354 * \return zero if pages successfully add to send queue.
2355 * \return not zere if error occurring.
2357 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2358 int cmd, struct loi_oap_pages *lop)
2360 struct ptlrpc_request *req;
2361 obd_count page_count = 0;
2362 struct osc_async_page *oap = NULL, *tmp;
2363 struct osc_brw_async_args *aa;
2364 struct obd_async_page_ops *ops;
2365 CFS_LIST_HEAD(rpc_list);
2366 unsigned int ending_offset;
2367 unsigned starting_offset = 0;
2371 /* If there are HP OAPs we need to handle at least 1 of them,
2372 * move it the beginning of the pending list for that. */
2373 if (!list_empty(&lop->lop_urgent)) {
2374 oap = list_entry(lop->lop_urgent.next,
2375 struct osc_async_page, oap_urgent_item);
2376 if (oap->oap_async_flags & ASYNC_HP)
2377 list_move(&oap->oap_pending_item, &lop->lop_pending);
2380 /* first we find the pages we're allowed to work with */
2381 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2382 ops = oap->oap_caller_ops;
2384 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2385 "magic 0x%x\n", oap, oap->oap_magic);
2387 if (page_count != 0 &&
2388 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2389 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2390 " oap %p, page %p, srvlock %u\n",
2391 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2394 /* in llite being 'ready' equates to the page being locked
2395 * until completion unlocks it. commit_write submits a page
2396 * as not ready because its unlock will happen unconditionally
2397 * as the call returns. if we race with commit_write giving
2398 * us that page we dont' want to create a hole in the page
2399 * stream, so we stop and leave the rpc to be fired by
2400 * another dirtier or kupdated interval (the not ready page
2401 * will still be on the dirty list). we could call in
2402 * at the end of ll_file_write to process the queue again. */
2403 if (!(oap->oap_async_flags & ASYNC_READY)) {
2404 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2406 CDEBUG(D_INODE, "oap %p page %p returned %d "
2407 "instead of ready\n", oap,
2411 /* llite is telling us that the page is still
2412 * in commit_write and that we should try
2413 * and put it in an rpc again later. we
2414 * break out of the loop so we don't create
2415 * a hole in the sequence of pages in the rpc
2420 /* the io isn't needed.. tell the checks
2421 * below to complete the rpc with EINTR */
2422 spin_lock(&oap->oap_lock);
2423 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2424 spin_unlock(&oap->oap_lock);
2425 oap->oap_count = -EINTR;
2428 spin_lock(&oap->oap_lock);
2429 oap->oap_async_flags |= ASYNC_READY;
2430 spin_unlock(&oap->oap_lock);
2433 LASSERTF(0, "oap %p page %p returned %d "
2434 "from make_ready\n", oap,
2442 * Page submitted for IO has to be locked. Either by
2443 * ->ap_make_ready() or by higher layers.
2445 #if defined(__KERNEL__) && defined(__linux__)
2446 if(!(PageLocked(oap->oap_page) &&
2447 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2448 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2449 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2453 /* If there is a gap at the start of this page, it can't merge
2454 * with any previous page, so we'll hand the network a
2455 * "fragmented" page array that it can't transfer in 1 RDMA */
2456 if (page_count != 0 && oap->oap_page_off != 0)
2459 /* take the page out of our book-keeping */
2460 list_del_init(&oap->oap_pending_item);
2461 lop_update_pending(cli, lop, cmd, -1);
2462 list_del_init(&oap->oap_urgent_item);
2464 if (page_count == 0)
2465 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2466 (PTLRPC_MAX_BRW_SIZE - 1);
2468 /* ask the caller for the size of the io as the rpc leaves. */
2469 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2471 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2472 if (oap->oap_count <= 0) {
2473 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2475 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2479 /* now put the page back in our accounting */
2480 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2481 if (page_count == 0)
2482 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2483 if (++page_count >= cli->cl_max_pages_per_rpc)
2486 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2487 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2488 * have the same alignment as the initial writes that allocated
2489 * extents on the server. */
2490 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2491 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2492 if (ending_offset == 0)
2495 /* If there is a gap at the end of this page, it can't merge
2496 * with any subsequent pages, so we'll hand the network a
2497 * "fragmented" page array that it can't transfer in 1 RDMA */
2498 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2502 osc_wake_cache_waiters(cli);
2504 if (page_count == 0)
2507 loi_list_maint(cli, loi);
2509 client_obd_list_unlock(&cli->cl_loi_list_lock);
2511 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2513 /* this should happen rarely and is pretty bad, it makes the
2514 * pending list not follow the dirty order */
2515 client_obd_list_lock(&cli->cl_loi_list_lock);
2516 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2517 list_del_init(&oap->oap_rpc_item);
2519 /* queued sync pages can be torn down while the pages
2520 * were between the pending list and the rpc */
2521 if (oap->oap_interrupted) {
2522 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2523 osc_ap_completion(cli, NULL, oap, 0,
2527 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2529 loi_list_maint(cli, loi);
2530 RETURN(PTR_ERR(req));
2533 aa = ptlrpc_req_async_args(req);
2534 if (cmd == OBD_BRW_READ) {
2535 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2536 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2537 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2538 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2540 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2541 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2542 cli->cl_w_in_flight);
2543 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2544 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2546 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2548 client_obd_list_lock(&cli->cl_loi_list_lock);
2550 if (cmd == OBD_BRW_READ)
2551 cli->cl_r_in_flight++;
2553 cli->cl_w_in_flight++;
2555 /* queued sync pages can be torn down while the pages
2556 * were between the pending list and the rpc */
2558 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2559 /* only one oap gets a request reference */
2562 if (oap->oap_interrupted && !req->rq_intr) {
2563 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2565 ptlrpc_mark_interrupted(req);
2569 tmp->oap_request = ptlrpc_request_addref(req);
2571 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2572 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2574 req->rq_interpret_reply = brw_interpret;
2575 ptlrpcd_add_req(req);
2579 #define LOI_DEBUG(LOI, STR, args...) \
2580 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2581 !list_empty(&(LOI)->loi_ready_item) || \
2582 !list_empty(&(LOI)->loi_hp_ready_item), \
2583 (LOI)->loi_write_lop.lop_num_pending, \
2584 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2585 (LOI)->loi_read_lop.lop_num_pending, \
2586 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2589 /* This is called by osc_check_rpcs() to find which objects have pages that
2590 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2591 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2594 /* First return objects that have blocked locks so that they
2595 * will be flushed quickly and other clients can get the lock,
2596 * then objects which have pages ready to be stuffed into RPCs */
2597 if (!list_empty(&cli->cl_loi_hp_ready_list))
2598 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2599 struct lov_oinfo, loi_hp_ready_item));
2600 if (!list_empty(&cli->cl_loi_ready_list))
2601 RETURN(list_entry(cli->cl_loi_ready_list.next,
2602 struct lov_oinfo, loi_ready_item));
2604 /* then if we have cache waiters, return all objects with queued
2605 * writes. This is especially important when many small files
2606 * have filled up the cache and not been fired into rpcs because
2607 * they don't pass the nr_pending/object threshhold */
2608 if (!list_empty(&cli->cl_cache_waiters) &&
2609 !list_empty(&cli->cl_loi_write_list))
2610 RETURN(list_entry(cli->cl_loi_write_list.next,
2611 struct lov_oinfo, loi_write_item));
2613 /* then return all queued objects when we have an invalid import
2614 * so that they get flushed */
2615 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2616 if (!list_empty(&cli->cl_loi_write_list))
2617 RETURN(list_entry(cli->cl_loi_write_list.next,
2618 struct lov_oinfo, loi_write_item));
2619 if (!list_empty(&cli->cl_loi_read_list))
2620 RETURN(list_entry(cli->cl_loi_read_list.next,
2621 struct lov_oinfo, loi_read_item));
2626 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2628 struct osc_async_page *oap;
2631 if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2632 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2633 struct osc_async_page, oap_urgent_item);
2634 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2637 if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2638 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2639 struct osc_async_page, oap_urgent_item);
2640 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2643 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2646 /* called with the loi list lock held */
2647 static void osc_check_rpcs(struct client_obd *cli)
2649 struct lov_oinfo *loi;
2650 int rc = 0, race_counter = 0;
2653 while ((loi = osc_next_loi(cli)) != NULL) {
2654 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2656 if (osc_max_rpc_in_flight(cli, loi))
2659 /* attempt some read/write balancing by alternating between
2660 * reads and writes in an object. The makes_rpc checks here
2661 * would be redundant if we were getting read/write work items
2662 * instead of objects. we don't want send_oap_rpc to drain a
2663 * partial read pending queue when we're given this object to
2664 * do io on writes while there are cache waiters */
2665 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2666 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2667 &loi->loi_write_lop);
2675 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2676 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2677 &loi->loi_read_lop);
2686 /* attempt some inter-object balancing by issueing rpcs
2687 * for each object in turn */
2688 if (!list_empty(&loi->loi_hp_ready_item))
2689 list_del_init(&loi->loi_hp_ready_item);
2690 if (!list_empty(&loi->loi_ready_item))
2691 list_del_init(&loi->loi_ready_item);
2692 if (!list_empty(&loi->loi_write_item))
2693 list_del_init(&loi->loi_write_item);
2694 if (!list_empty(&loi->loi_read_item))
2695 list_del_init(&loi->loi_read_item);
2697 loi_list_maint(cli, loi);
2699 /* send_oap_rpc fails with 0 when make_ready tells it to
2700 * back off. llite's make_ready does this when it tries
2701 * to lock a page queued for write that is already locked.
2702 * we want to try sending rpcs from many objects, but we
2703 * don't want to spin failing with 0. */
2704 if (race_counter == 10)
2710 /* we're trying to queue a page in the osc so we're subject to the
2711 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2712 * If the osc's queued pages are already at that limit, then we want to sleep
2713 * until there is space in the osc's queue for us. We also may be waiting for
2714 * write credits from the OST if there are RPCs in flight that may return some
2715 * before we fall back to sync writes.
2717 * We need this know our allocation was granted in the presence of signals */
2718 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2722 client_obd_list_lock(&cli->cl_loi_list_lock);
2723 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2724 client_obd_list_unlock(&cli->cl_loi_list_lock);
2728 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2729 * grant or cache space. */
2730 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2731 struct osc_async_page *oap)
2733 struct osc_cache_waiter ocw;
2734 struct l_wait_info lwi = { 0 };
2737 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2738 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2739 cli->cl_dirty_max, obd_max_dirty_pages,
2740 cli->cl_lost_grant, cli->cl_avail_grant);
2742 /* force the caller to try sync io. this can jump the list
2743 * of queued writes and create a discontiguous rpc stream */
2744 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2745 loi->loi_ar.ar_force_sync)
2748 /* Hopefully normal case - cache space and write credits available */
2749 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2750 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2751 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2752 /* account for ourselves */
2753 osc_consume_write_grant(cli, &oap->oap_brw_page);
2757 /* Make sure that there are write rpcs in flight to wait for. This
2758 * is a little silly as this object may not have any pending but
2759 * other objects sure might. */
2760 if (cli->cl_w_in_flight) {
2761 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2762 cfs_waitq_init(&ocw.ocw_waitq);
2766 loi_list_maint(cli, loi);
2767 osc_check_rpcs(cli);
2768 client_obd_list_unlock(&cli->cl_loi_list_lock);
2770 CDEBUG(D_CACHE, "sleeping for cache space\n");
2771 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2773 client_obd_list_lock(&cli->cl_loi_list_lock);
2774 if (!list_empty(&ocw.ocw_entry)) {
2775 list_del(&ocw.ocw_entry);
2784 static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
2785 void **res, int rw, obd_off start, obd_off end,
2786 struct lustre_handle *lockh, int flags)
2788 struct ldlm_lock *lock = NULL;
2789 int rc, release = 0;
2793 if (lockh && lustre_handle_is_used(lockh)) {
2794 /* if a valid lockh is passed, just check that the corresponding
2795 * lock covers the extent */
2796 lock = ldlm_handle2lock(lockh);
2799 struct osc_async_page *oap = *res;
2800 spin_lock(&oap->oap_lock);
2801 lock = oap->oap_ldlm_lock;
2803 LDLM_LOCK_GET(lock);
2804 spin_unlock(&oap->oap_lock);
2806 /* lock can be NULL in case race obd_get_lock vs lock cancel
2807 * so we should be don't try match this */
2808 if (unlikely(!lock))
2811 rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
2812 if (release == 1 && rc == 1)
2813 /* if a valid lockh was passed, we just need to check
2814 * that the lock covers the page, no reference should be
2816 ldlm_lock_decref(lockh,
2817 rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
2818 LDLM_LOCK_PUT(lock);
2822 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2823 struct lov_oinfo *loi, cfs_page_t *page,
2824 obd_off offset, struct obd_async_page_ops *ops,
2825 void *data, void **res, int flags,
2826 struct lustre_handle *lockh)
2828 struct osc_async_page *oap;
2829 struct ldlm_res_id oid = {{0}};
2835 return size_round(sizeof(*oap));
2838 oap->oap_magic = OAP_MAGIC;
2839 oap->oap_cli = &exp->exp_obd->u.cli;
2842 oap->oap_caller_ops = ops;
2843 oap->oap_caller_data = data;
2845 oap->oap_page = page;
2846 oap->oap_obj_off = offset;
2848 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2849 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2850 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2851 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2853 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2855 spin_lock_init(&oap->oap_lock);
2857 /* If the page was marked as notcacheable - don't add to any locks */
2858 if (!(flags & OBD_PAGE_NO_CACHE)) {
2859 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2860 /* This is the only place where we can call cache_add_extent
2861 without oap_lock, because this page is locked now, and
2862 the lock we are adding it to is referenced, so cannot lose
2863 any pages either. */
2864 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2869 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2873 struct osc_async_page *oap_from_cookie(void *cookie)
2875 struct osc_async_page *oap = cookie;
2876 if (oap->oap_magic != OAP_MAGIC)
2877 return ERR_PTR(-EINVAL);
2881 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2882 struct lov_oinfo *loi, void *cookie,
2883 int cmd, obd_off off, int count,
2884 obd_flag brw_flags, enum async_flags async_flags)
2886 struct client_obd *cli = &exp->exp_obd->u.cli;
2887 struct osc_async_page *oap;
2891 oap = oap_from_cookie(cookie);
2893 RETURN(PTR_ERR(oap));
2895 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2898 if (!list_empty(&oap->oap_pending_item) ||
2899 !list_empty(&oap->oap_urgent_item) ||
2900 !list_empty(&oap->oap_rpc_item))
2903 /* check if the file's owner/group is over quota */
2904 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2905 struct obd_async_page_ops *ops;
2912 ops = oap->oap_caller_ops;
2913 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2914 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2924 loi = lsm->lsm_oinfo[0];
2926 client_obd_list_lock(&cli->cl_loi_list_lock);
2929 oap->oap_page_off = off;
2930 oap->oap_count = count;
2931 oap->oap_brw_flags = brw_flags;
2932 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2933 if (libcfs_memory_pressure_get())
2934 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2935 spin_lock(&oap->oap_lock);
2936 oap->oap_async_flags = async_flags;
2937 spin_unlock(&oap->oap_lock);
2939 if (cmd & OBD_BRW_WRITE) {
2940 rc = osc_enter_cache(cli, loi, oap);
2942 client_obd_list_unlock(&cli->cl_loi_list_lock);
2947 osc_oap_to_pending(oap);
2948 loi_list_maint(cli, loi);
2950 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2953 osc_check_rpcs(cli);
2954 client_obd_list_unlock(&cli->cl_loi_list_lock);
2959 /* aka (~was & now & flag), but this is more clear :) */
2960 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2962 static int osc_set_async_flags(struct obd_export *exp,
2963 struct lov_stripe_md *lsm,
2964 struct lov_oinfo *loi, void *cookie,
2965 obd_flag async_flags)
2967 struct client_obd *cli = &exp->exp_obd->u.cli;
2968 struct loi_oap_pages *lop;
2969 struct osc_async_page *oap;
2973 oap = oap_from_cookie(cookie);
2975 RETURN(PTR_ERR(oap));
2978 * bug 7311: OST-side locking is only supported for liblustre for now
2979 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2980 * implementation has to handle case where OST-locked page was picked
2981 * up by, e.g., ->writepage().
2983 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2984 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2987 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2991 loi = lsm->lsm_oinfo[0];
2993 if (oap->oap_cmd & OBD_BRW_WRITE) {
2994 lop = &loi->loi_write_lop;
2996 lop = &loi->loi_read_lop;
2999 client_obd_list_lock(&cli->cl_loi_list_lock);
3000 /* oap_lock provides atomic semantics of oap_async_flags access */
3001 spin_lock(&oap->oap_lock);
3002 if (list_empty(&oap->oap_pending_item))
3003 GOTO(out, rc = -EINVAL);
3005 if ((oap->oap_async_flags & async_flags) == async_flags)
3008 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3009 oap->oap_async_flags |= ASYNC_READY;
3011 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3012 list_empty(&oap->oap_rpc_item)) {
3013 if (oap->oap_async_flags & ASYNC_HP)
3014 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3016 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
3017 oap->oap_async_flags |= ASYNC_URGENT;
3018 loi_list_maint(cli, loi);
3021 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3022 oap->oap_async_flags);
3024 spin_unlock(&oap->oap_lock);
3025 osc_check_rpcs(cli);
3026 client_obd_list_unlock(&cli->cl_loi_list_lock);
3030 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
3031 struct lov_oinfo *loi,
3032 struct obd_io_group *oig, void *cookie,
3033 int cmd, obd_off off, int count,
3035 obd_flag async_flags)
3037 struct client_obd *cli = &exp->exp_obd->u.cli;
3038 struct osc_async_page *oap;
3039 struct loi_oap_pages *lop;
3043 oap = oap_from_cookie(cookie);
3045 RETURN(PTR_ERR(oap));
3047 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3050 if (!list_empty(&oap->oap_pending_item) ||
3051 !list_empty(&oap->oap_urgent_item) ||
3052 !list_empty(&oap->oap_rpc_item))
3056 loi = lsm->lsm_oinfo[0];
3058 client_obd_list_lock(&cli->cl_loi_list_lock);
3061 oap->oap_page_off = off;
3062 oap->oap_count = count;
3063 oap->oap_brw_flags = brw_flags;
3064 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3065 if (libcfs_memory_pressure_get())
3066 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3067 spin_lock(&oap->oap_lock);
3068 oap->oap_async_flags = async_flags;
3069 spin_unlock(&oap->oap_lock);
3071 if (cmd & OBD_BRW_WRITE)
3072 lop = &loi->loi_write_lop;
3074 lop = &loi->loi_read_lop;
3076 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
3077 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
3079 rc = oig_add_one(oig, &oap->oap_occ);
3082 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
3083 oap, oap->oap_page, rc);
3085 client_obd_list_unlock(&cli->cl_loi_list_lock);
3090 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
3091 struct loi_oap_pages *lop, int cmd)
3093 struct list_head *pos, *tmp;
3094 struct osc_async_page *oap;
3096 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
3097 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
3098 list_del(&oap->oap_pending_item);
3099 osc_oap_to_pending(oap);
3101 loi_list_maint(cli, loi);
3104 static int osc_trigger_group_io(struct obd_export *exp,
3105 struct lov_stripe_md *lsm,
3106 struct lov_oinfo *loi,
3107 struct obd_io_group *oig)
3109 struct client_obd *cli = &exp->exp_obd->u.cli;
3113 loi = lsm->lsm_oinfo[0];
3115 client_obd_list_lock(&cli->cl_loi_list_lock);
3117 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
3118 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
3120 osc_check_rpcs(cli);
3121 client_obd_list_unlock(&cli->cl_loi_list_lock);
3126 static int osc_teardown_async_page(struct obd_export *exp,
3127 struct lov_stripe_md *lsm,
3128 struct lov_oinfo *loi, void *cookie)
3130 struct client_obd *cli = &exp->exp_obd->u.cli;
3131 struct loi_oap_pages *lop;
3132 struct osc_async_page *oap;
3136 oap = oap_from_cookie(cookie);
3138 RETURN(PTR_ERR(oap));
3141 loi = lsm->lsm_oinfo[0];
3143 if (oap->oap_cmd & OBD_BRW_WRITE) {
3144 lop = &loi->loi_write_lop;
3146 lop = &loi->loi_read_lop;
3149 client_obd_list_lock(&cli->cl_loi_list_lock);
3151 if (!list_empty(&oap->oap_rpc_item))
3152 GOTO(out, rc = -EBUSY);
3154 osc_exit_cache(cli, oap, 0);
3155 osc_wake_cache_waiters(cli);
3157 if (!list_empty(&oap->oap_urgent_item)) {
3158 list_del_init(&oap->oap_urgent_item);
3159 spin_lock(&oap->oap_lock);
3160 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3161 spin_unlock(&oap->oap_lock);
3164 if (!list_empty(&oap->oap_pending_item)) {
3165 list_del_init(&oap->oap_pending_item);
3166 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3168 loi_list_maint(cli, loi);
3169 cache_remove_extent(cli->cl_cache, oap);
3171 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3173 client_obd_list_unlock(&cli->cl_loi_list_lock);
3177 int osc_extent_blocking_cb(struct ldlm_lock *lock,
3178 struct ldlm_lock_desc *new, void *data,
3181 struct lustre_handle lockh = { 0 };
3185 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
3186 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3191 case LDLM_CB_BLOCKING:
3192 ldlm_lock2handle(lock, &lockh);
3193 rc = ldlm_cli_cancel(&lockh);
3195 CERROR("ldlm_cli_cancel failed: %d\n", rc);
3197 case LDLM_CB_CANCELING: {
3199 ldlm_lock2handle(lock, &lockh);
3200 /* This lock wasn't granted, don't try to do anything */
3201 if (lock->l_req_mode != lock->l_granted_mode)
3204 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3207 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3208 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3209 lock, new, data,flag);
3218 EXPORT_SYMBOL(osc_extent_blocking_cb);
3220 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3223 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3226 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3229 lock_res_and_lock(lock);
3230 #if defined (__KERNEL__) && defined (__linux__)
3231 /* Liang XXX: Darwin and Winnt checking should be added */
3232 if (lock->l_ast_data && lock->l_ast_data != data) {
3233 struct inode *new_inode = data;
3234 struct inode *old_inode = lock->l_ast_data;
3235 if (!(old_inode->i_state & I_FREEING))
3236 LDLM_ERROR(lock, "inconsistent l_ast_data found");
3237 LASSERTF(old_inode->i_state & I_FREEING,
3238 "Found existing inode %p/%lu/%u state %lu in lock: "
3239 "setting data to %p/%lu/%u\n", old_inode,
3240 old_inode->i_ino, old_inode->i_generation,
3242 new_inode, new_inode->i_ino, new_inode->i_generation);
3245 lock->l_ast_data = data;
3246 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3247 unlock_res_and_lock(lock);
3248 LDLM_LOCK_PUT(lock);
3251 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3252 ldlm_iterator_t replace, void *data)
3254 struct ldlm_res_id res_id;
3255 struct obd_device *obd = class_exp2obd(exp);
3257 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3258 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3262 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3263 struct obd_info *oinfo, int intent, int rc)
3268 /* The request was created before ldlm_cli_enqueue call. */
3269 if (rc == ELDLM_LOCK_ABORTED) {
3270 struct ldlm_reply *rep;
3272 /* swabbed by ldlm_cli_enqueue() */
3273 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3274 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3276 LASSERT(rep != NULL);
3277 if (rep->lock_policy_res1)
3278 rc = rep->lock_policy_res1;
3282 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3283 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3284 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3285 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3286 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3290 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3292 /* Call the update callback. */
3293 rc = oinfo->oi_cb_up(oinfo, rc);
3297 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3298 struct osc_enqueue_args *aa, int rc)
3300 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3301 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3302 struct ldlm_lock *lock;
3304 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3306 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3308 /* Complete obtaining the lock procedure. */
3309 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3311 &aa->oa_oi->oi_flags,
3312 &lsm->lsm_oinfo[0]->loi_lvb,
3313 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3314 lustre_swab_ost_lvb,
3315 aa->oa_oi->oi_lockh, rc);
3317 /* Complete osc stuff. */
3318 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3320 /* Release the lock for async request. */
3321 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3322 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3324 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3325 aa->oa_oi->oi_lockh, req, aa);
3326 LDLM_LOCK_PUT(lock);
3330 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3331 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3332 * other synchronous requests, however keeping some locks and trying to obtain
3333 * others may take a considerable amount of time in a case of ost failure; and
3334 * when other sync requests do not get released lock from a client, the client
3335 * is excluded from the cluster -- such scenarious make the life difficult, so
3336 * release locks just after they are obtained. */
3337 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3338 struct ldlm_enqueue_info *einfo,
3339 struct ptlrpc_request_set *rqset)
3341 struct ldlm_res_id res_id;
3342 struct obd_device *obd = exp->exp_obd;
3343 struct ldlm_reply *rep;
3344 struct ptlrpc_request *req = NULL;
3345 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3350 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3351 oinfo->oi_md->lsm_object_gr, &res_id);
3352 /* Filesystem lock extents are extended to page boundaries so that
3353 * dealing with the page cache is a little smoother. */
3354 oinfo->oi_policy.l_extent.start -=
3355 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3356 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3358 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3361 /* Next, search for already existing extent locks that will cover us */
3362 /* If we're trying to read, we also search for an existing PW lock. The
3363 * VFS and page cache already protect us locally, so lots of readers/
3364 * writers can share a single PW lock.
3366 * There are problems with conversion deadlocks, so instead of
3367 * converting a read lock to a write lock, we'll just enqueue a new
3370 * At some point we should cancel the read lock instead of making them
3371 * send us a blocking callback, but there are problems with canceling
3372 * locks out from other users right now, too. */
3373 mode = einfo->ei_mode;
3374 if (einfo->ei_mode == LCK_PR)
3376 mode = ldlm_lock_match(obd->obd_namespace,
3377 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3378 einfo->ei_type, &oinfo->oi_policy, mode,
3381 /* addref the lock only if not async requests and PW lock is
3382 * matched whereas we asked for PR. */
3383 if (!rqset && einfo->ei_mode != mode)
3384 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3385 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3388 /* I would like to be able to ASSERT here that rss <=
3389 * kms, but I can't, for reasons which are explained in
3393 /* We already have a lock, and it's referenced */
3394 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3396 /* For async requests, decref the lock. */
3397 if (einfo->ei_mode != mode)
3398 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3400 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3408 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3409 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
3410 [DLM_LOCKREQ_OFF + 1] = 0 };
3412 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3416 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3417 size[DLM_REPLY_REC_OFF] =
3418 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3419 ptlrpc_req_set_repsize(req, 3, size);
3422 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3423 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3425 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3426 &oinfo->oi_policy, &oinfo->oi_flags,
3427 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3428 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3429 lustre_swab_ost_lvb, oinfo->oi_lockh,
3433 struct osc_enqueue_args *aa;
3434 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3435 aa = ptlrpc_req_async_args(req);
3440 req->rq_interpret_reply = osc_enqueue_interpret;
3441 ptlrpc_set_add_req(rqset, req);
3442 } else if (intent) {
3443 ptlrpc_req_finished(req);
3448 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3450 ptlrpc_req_finished(req);
3455 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3456 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3457 int *flags, void *data, struct lustre_handle *lockh,
3460 struct ldlm_res_id res_id;
3461 struct obd_device *obd = exp->exp_obd;
3462 int lflags = *flags;
3466 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3468 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3470 /* Filesystem lock extents are extended to page boundaries so that
3471 * dealing with the page cache is a little smoother */
3472 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3473 policy->l_extent.end |= ~CFS_PAGE_MASK;
3475 /* Next, search for already existing extent locks that will cover us */
3476 /* If we're trying to read, we also search for an existing PW lock. The
3477 * VFS and page cache already protect us locally, so lots of readers/
3478 * writers can share a single PW lock. */
3482 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3483 &res_id, type, policy, rc, lockh);
3485 osc_set_data_with_check(lockh, data, lflags);
3486 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3487 ldlm_lock_addref(lockh, LCK_PR);
3488 ldlm_lock_decref(lockh, LCK_PW);
3490 if (n_matches != NULL)
3497 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3498 __u32 mode, struct lustre_handle *lockh, int flags,
3503 if (unlikely(mode == LCK_GROUP))
3504 ldlm_lock_decref_and_cancel(lockh, mode);
3506 ldlm_lock_decref(lockh, mode);
3511 static int osc_cancel_unused(struct obd_export *exp,
3512 struct lov_stripe_md *lsm, int flags, void *opaque)
3514 struct obd_device *obd = class_exp2obd(exp);
3515 struct ldlm_res_id res_id, *resp = NULL;
3518 resp = osc_build_res_name(lsm->lsm_object_id,
3519 lsm->lsm_object_gr, &res_id);
3522 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3526 static int osc_join_lru(struct obd_export *exp,
3527 struct lov_stripe_md *lsm, int join)
3529 struct obd_device *obd = class_exp2obd(exp);
3530 struct ldlm_res_id res_id, *resp = NULL;
3533 resp = osc_build_res_name(lsm->lsm_object_id,
3534 lsm->lsm_object_gr, &res_id);
3537 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3541 static int osc_statfs_interpret(struct ptlrpc_request *req,
3542 struct osc_async_args *aa, int rc)
3544 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3545 struct obd_statfs *msfs;
3549 /* The request has in fact never been sent
3550 * due to issues at a higher level (LOV).
3551 * Exit immediately since the caller is
3552 * aware of the problem and takes care
3553 * of the clean up */
3556 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3557 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3563 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3564 lustre_swab_obd_statfs);
3566 CERROR("Can't unpack obd_statfs\n");
3567 GOTO(out, rc = -EPROTO);
3570 /* Reinitialize the RDONLY and DEGRADED flags at the client
3571 * on each statfs, so they don't stay set permanently. */
3572 spin_lock(&cli->cl_oscc.oscc_lock);
3573 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_RDONLY | OSCC_FLAG_DEGRADED);
3574 if (msfs->os_state & OS_STATE_DEGRADED)
3575 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3577 if (msfs->os_state & OS_STATE_READONLY)
3578 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3579 spin_unlock(&cli->cl_oscc.oscc_lock);
3581 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3583 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3587 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3588 __u64 max_age, struct ptlrpc_request_set *rqset)
3590 struct ptlrpc_request *req;
3591 struct osc_async_args *aa;
3592 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3595 /* We could possibly pass max_age in the request (as an absolute
3596 * timestamp or a "seconds.usec ago") so the target can avoid doing
3597 * extra calls into the filesystem if that isn't necessary (e.g.
3598 * during mount that would help a bit). Having relative timestamps
3599 * is not so great if request processing is slow, while absolute
3600 * timestamps are not ideal because they need time synchronization. */
3601 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3602 OST_STATFS, 1, NULL, NULL);
3606 ptlrpc_req_set_repsize(req, 2, size);
3607 req->rq_request_portal = OST_CREATE_PORTAL;
3608 ptlrpc_at_set_req_timeout(req);
3609 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3610 /* procfs requests not want stat in wait for avoid deadlock */
3611 req->rq_no_resend = 1;
3612 req->rq_no_delay = 1;
3615 req->rq_interpret_reply = osc_statfs_interpret;
3616 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3617 aa = ptlrpc_req_async_args(req);
3620 ptlrpc_set_add_req(rqset, req);
3624 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3625 __u64 max_age, __u32 flags)
3627 struct obd_statfs *msfs;
3628 struct ptlrpc_request *req;
3629 struct obd_import *imp = NULL;
3630 __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3634 /*Since the request might also come from lprocfs, so we need
3635 *sync this with client_disconnect_export Bug15684*/
3636 down_read(&obd->u.cli.cl_sem);
3637 if (obd->u.cli.cl_import)
3638 imp = class_import_get(obd->u.cli.cl_import);
3639 up_read(&obd->u.cli.cl_sem);
3643 /* We could possibly pass max_age in the request (as an absolute
3644 * timestamp or a "seconds.usec ago") so the target can avoid doing
3645 * extra calls into the filesystem if that isn't necessary (e.g.
3646 * during mount that would help a bit). Having relative timestamps
3647 * is not so great if request processing is slow, while absolute
3648 * timestamps are not ideal because they need time synchronization. */
3649 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3650 OST_STATFS, 1, NULL, NULL);
3652 class_import_put(imp);
3656 ptlrpc_req_set_repsize(req, 2, size);
3657 req->rq_request_portal = OST_CREATE_PORTAL;
3658 ptlrpc_at_set_req_timeout(req);
3660 if (flags & OBD_STATFS_NODELAY) {
3661 /* procfs requests not want stat in wait for avoid deadlock */
3662 req->rq_no_resend = 1;
3663 req->rq_no_delay = 1;
3666 rc = ptlrpc_queue_wait(req);
3670 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3671 lustre_swab_obd_statfs);
3673 CERROR("Can't unpack obd_statfs\n");
3674 GOTO(out, rc = -EPROTO);
3677 memcpy(osfs, msfs, sizeof(*osfs));
3681 ptlrpc_req_finished(req);
3685 /* Retrieve object striping information.
3687 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3688 * the maximum number of OST indices which will fit in the user buffer.
3689 * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3691 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3693 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3694 struct lov_user_md_v3 lum, *lumk;
3695 int rc = 0, lum_size;
3696 struct lov_user_ost_data_v1 *lmm_objects;
3702 /* we only need the header part from user space to get lmm_magic and
3703 * lmm_stripe_count, (the header part is common to v1 and v3) */
3704 lum_size = sizeof(struct lov_user_md_v1);
3705 memset(&lum, 0x00, sizeof(lum));
3706 if (copy_from_user(&lum, lump, lum_size))
3709 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3710 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3713 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3714 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3715 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3716 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3718 /* we can use lov_mds_md_size() to compute lum_size
3719 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3720 if (lum.lmm_stripe_count > 0) {
3721 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3722 OBD_ALLOC(lumk, lum_size);
3725 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3726 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3728 lmm_objects = &(lumk->lmm_objects[0]);
3729 lmm_objects->l_object_id = lsm->lsm_object_id;
3731 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3735 lumk->lmm_magic = lum.lmm_magic;
3736 lumk->lmm_stripe_count = 1;
3737 lumk->lmm_object_id = lsm->lsm_object_id;
3739 if ((lsm->lsm_magic == LOV_USER_MAGIC_V1_SWABBED) ||
3740 (lsm->lsm_magic == LOV_USER_MAGIC_V3_SWABBED)) {
3741 /* lsm not in host order, so count also need be in same order */
3742 __swab32s(&lumk->lmm_magic);
3743 __swab16s(&lumk->lmm_stripe_count);
3744 lustre_swab_lov_user_md((struct lov_user_md_v1*)lumk);
3745 if (lum.lmm_stripe_count > 0)
3746 lustre_swab_lov_user_md_objects(
3747 (struct lov_user_md_v1*)lumk);
3750 if (copy_to_user(lump, lumk, lum_size))
3754 OBD_FREE(lumk, lum_size);
3760 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3761 void *karg, void *uarg)
3763 struct obd_device *obd = exp->exp_obd;
3764 struct obd_ioctl_data *data = karg;
3768 if (!try_module_get(THIS_MODULE)) {
3769 CERROR("Can't get module. Is it alive?");
3773 case OBD_IOC_LOV_GET_CONFIG: {
3775 struct lov_desc *desc;
3776 struct obd_uuid uuid;
3780 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3781 GOTO(out, err = -EINVAL);
3783 data = (struct obd_ioctl_data *)buf;
3785 if (sizeof(*desc) > data->ioc_inllen1) {
3786 obd_ioctl_freedata(buf, len);
3787 GOTO(out, err = -EINVAL);
3790 if (data->ioc_inllen2 < sizeof(uuid)) {
3791 obd_ioctl_freedata(buf, len);
3792 GOTO(out, err = -EINVAL);
3795 desc = (struct lov_desc *)data->ioc_inlbuf1;
3796 desc->ld_tgt_count = 1;
3797 desc->ld_active_tgt_count = 1;
3798 desc->ld_default_stripe_count = 1;
3799 desc->ld_default_stripe_size = 0;
3800 desc->ld_default_stripe_offset = 0;
3801 desc->ld_pattern = 0;
3802 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3804 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3806 err = copy_to_user((void *)uarg, buf, len);
3809 obd_ioctl_freedata(buf, len);
3812 case LL_IOC_LOV_SETSTRIPE:
3813 err = obd_alloc_memmd(exp, karg);
3817 case LL_IOC_LOV_GETSTRIPE:
3818 err = osc_getstripe(karg, uarg);
3820 case OBD_IOC_CLIENT_RECOVER:
3821 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3826 case IOC_OSC_SET_ACTIVE:
3827 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3830 case OBD_IOC_POLL_QUOTACHECK:
3831 err = lquota_poll_check(quota_interface, exp,
3832 (struct if_quotacheck *)karg);
3834 case OBD_IOC_DESTROY: {
3837 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3838 GOTO (out, err = -EPERM);
3839 oa = &data->ioc_obdo1;
3842 GOTO(out, err = -EINVAL);
3844 oa->o_valid |= OBD_MD_FLGROUP;
3846 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3849 case OBD_IOC_PING_TARGET:
3850 err = ptlrpc_obd_ping(obd);
3853 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3854 cmd, cfs_curproc_comm());
3855 GOTO(out, err = -ENOTTY);
3858 module_put(THIS_MODULE);
3862 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3863 void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3866 if (!vallen || !val)
3869 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3870 __u32 *stripe = val;
3871 *vallen = sizeof(*stripe);
3874 } else if (KEY_IS(KEY_OFF_RPCSIZE)) {
3875 struct client_obd *cli = &exp->exp_obd->u.cli;
3876 __u64 *rpcsize = val;
3877 LASSERT(*vallen == sizeof(__u64));
3878 *rpcsize = (__u64)cli->cl_max_pages_per_rpc;
3880 } else if (KEY_IS(KEY_LAST_ID)) {
3881 struct ptlrpc_request *req;
3883 char *bufs[2] = { NULL, key };
3884 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3887 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3888 OST_GET_INFO, 2, size, bufs);
3892 size[REPLY_REC_OFF] = *vallen;
3893 ptlrpc_req_set_repsize(req, 2, size);
3894 rc = ptlrpc_queue_wait(req);
3898 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3899 lustre_swab_ost_last_id);
3900 if (reply == NULL) {
3901 CERROR("Can't unpack OST last ID\n");
3902 GOTO(out, rc = -EPROTO);
3904 *((obd_id *)val) = *reply;
3906 ptlrpc_req_finished(req);
3908 } else if (KEY_IS(KEY_FIEMAP)) {
3909 struct ptlrpc_request *req;
3910 struct ll_user_fiemap *reply;
3911 char *bufs[2] = { NULL, key };
3912 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3915 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3916 OST_GET_INFO, 2, size, bufs);
3920 size[REPLY_REC_OFF] = *vallen;
3921 ptlrpc_req_set_repsize(req, 2, size);
3923 rc = ptlrpc_queue_wait(req);
3926 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3927 lustre_swab_fiemap);
3928 if (reply == NULL) {
3929 CERROR("Can't unpack FIEMAP reply.\n");
3930 GOTO(out1, rc = -EPROTO);
3933 memcpy(val, reply, *vallen);
3936 ptlrpc_req_finished(req);
3944 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3947 struct llog_ctxt *ctxt;
3948 struct obd_import *imp = req->rq_import;
3954 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3957 rc = llog_initiator_connect(ctxt);
3959 CERROR("cannot establish connection for "
3960 "ctxt %p: %d\n", ctxt, rc);
3963 llog_ctxt_put(ctxt);
3964 spin_lock(&imp->imp_lock);
3965 imp->imp_server_timeout = 1;
3966 imp->imp_pingable = 1;
3967 spin_unlock(&imp->imp_lock);
3968 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3973 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3974 void *key, obd_count vallen, void *val,
3975 struct ptlrpc_request_set *set)
3977 struct ptlrpc_request *req;
3978 struct obd_device *obd = exp->exp_obd;
3979 struct obd_import *imp = class_exp2cliimp(exp);
3980 __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3981 char *bufs[3] = { NULL, key, val };
3984 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3986 if (KEY_IS(KEY_NEXT_ID)) {
3987 if (vallen != sizeof(obd_id))
3989 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3990 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3991 exp->exp_obd->obd_name,
3992 obd->u.cli.cl_oscc.oscc_next_id);
3997 if (KEY_IS(KEY_UNLINKED)) {
3998 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3999 spin_lock(&oscc->oscc_lock);
4000 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4001 spin_unlock(&oscc->oscc_lock);
4005 if (KEY_IS(KEY_INIT_RECOV)) {
4006 if (vallen != sizeof(int))
4008 spin_lock(&imp->imp_lock);
4009 imp->imp_initial_recov = *(int *)val;
4010 spin_unlock(&imp->imp_lock);
4011 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
4012 exp->exp_obd->obd_name,
4013 imp->imp_initial_recov);
4017 if (KEY_IS(KEY_CHECKSUM)) {
4018 if (vallen != sizeof(int))
4020 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4024 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4027 /* We pass all other commands directly to OST. Since nobody calls osc
4028 methods directly and everybody is supposed to go through LOV, we
4029 assume lov checked invalid values for us.
4030 The only recognised values so far are evict_by_nid and mds_conn.
4031 Even if something bad goes through, we'd get a -EINVAL from OST
4034 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
4039 if (KEY_IS(KEY_MDS_CONN))
4040 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4041 else if (KEY_IS(KEY_GRANT_SHRINK))
4042 req->rq_interpret_reply = osc_shrink_grant_interpret;
4044 if (KEY_IS(KEY_GRANT_SHRINK)) {
4045 struct osc_grant_args *aa;
4048 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4049 aa = ptlrpc_req_async_args(req);
4052 ptlrpc_req_finished(req);
4055 *oa = ((struct ost_body *)val)->oa;
4059 ptlrpc_req_set_repsize(req, 2, size);
4060 ptlrpcd_add_req(req);
4062 ptlrpc_req_set_repsize(req, 1, NULL);
4063 ptlrpc_set_add_req(set, req);
4064 ptlrpc_check_set(set);
4071 static struct llog_operations osc_size_repl_logops = {
4072 lop_cancel: llog_obd_repl_cancel
4075 static struct llog_operations osc_mds_ost_orig_logops;
4076 static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
4079 struct llog_catid catid;
4080 static char name[32] = CATLIST;
4086 mutex_down(&disk_obd->obd_llog_cat_process);
4088 rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
4090 CERROR("rc: %d\n", rc);
4091 GOTO(out_unlock, rc);
4094 CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
4095 obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
4096 catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4099 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
4100 &catid.lci_logid, &osc_mds_ost_orig_logops);
4102 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4106 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
4107 &osc_size_repl_logops);
4109 struct llog_ctxt *ctxt =
4110 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4113 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4117 CERROR("osc '%s' tgt '%s' rc=%d\n",
4118 obd->obd_name, disk_obd->obd_name, rc);
4119 CERROR("logid "LPX64":0x%x\n",
4120 catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
4122 rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
4125 CERROR("rc: %d\n", rc);
4128 mutex_up(&disk_obd->obd_llog_cat_process);
4133 static int osc_llog_finish(struct obd_device *obd, int count)
4135 struct llog_ctxt *ctxt;
4136 int rc = 0, rc2 = 0;
4139 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4141 rc = llog_cleanup(ctxt);
4143 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4145 rc2 = llog_cleanup(ctxt);
4152 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
4153 struct obd_uuid *cluuid,
4154 struct obd_connect_data *data,
4157 struct client_obd *cli = &obd->u.cli;
4159 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4162 client_obd_list_lock(&cli->cl_loi_list_lock);
4163 data->ocd_grant = cli->cl_avail_grant ?:
4164 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4165 lost_grant = cli->cl_lost_grant;
4166 cli->cl_lost_grant = 0;
4167 client_obd_list_unlock(&cli->cl_loi_list_lock);
4169 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4170 "cl_lost_grant: %ld\n", data->ocd_grant,
4171 cli->cl_avail_grant, lost_grant);
4172 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4173 " ocd_grant: %d\n", data->ocd_connect_flags,
4174 data->ocd_version, data->ocd_grant);
4180 static int osc_disconnect(struct obd_export *exp)
4182 struct obd_device *obd = class_exp2obd(exp);
4183 struct llog_ctxt *ctxt;
4186 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4188 if (obd->u.cli.cl_conn_count == 1) {
4189 /* Flush any remaining cancel messages out to the
4191 llog_sync(ctxt, exp);
4193 llog_ctxt_put(ctxt);
4195 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4199 rc = client_disconnect_export(exp);
4201 * Initially we put del_shrink_grant before disconnect_export, but it
4202 * causes the following problem if setup (connect) and cleanup
4203 * (disconnect) are tangled together.
4204 * connect p1 disconnect p2
4205 * ptlrpc_connect_import
4206 * ............... class_manual_cleanup
4209 * ptlrpc_connect_interrupt
4211 * add this client to shrink list
4213 * Bang! pinger trigger the shrink.
4214 * So the osc should be disconnected from the shrink list, after we
4215 * are sure the import has been destroyed. BUG18662
4217 if (obd->u.cli.cl_import == NULL)
4218 osc_del_shrink_grant(&obd->u.cli);
4222 static int osc_import_event(struct obd_device *obd,
4223 struct obd_import *imp,
4224 enum obd_import_event event)
4226 struct client_obd *cli;
4230 LASSERT(imp->imp_obd == obd);
4233 case IMP_EVENT_DISCON: {
4234 /* Only do this on the MDS OSC's */
4235 if (imp->imp_server_timeout) {
4236 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4238 spin_lock(&oscc->oscc_lock);
4239 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4240 spin_unlock(&oscc->oscc_lock);
4243 client_obd_list_lock(&cli->cl_loi_list_lock);
4244 cli->cl_avail_grant = 0;
4245 cli->cl_lost_grant = 0;
4246 client_obd_list_unlock(&cli->cl_loi_list_lock);
4247 ptlrpc_import_setasync(imp, -1);
4251 case IMP_EVENT_INACTIVE: {
4252 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4255 case IMP_EVENT_INVALIDATE: {
4256 struct ldlm_namespace *ns = obd->obd_namespace;
4260 client_obd_list_lock(&cli->cl_loi_list_lock);
4261 /* all pages go to failing rpcs due to the invalid import */
4262 osc_check_rpcs(cli);
4263 client_obd_list_unlock(&cli->cl_loi_list_lock);
4265 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4269 case IMP_EVENT_ACTIVE: {
4270 /* Only do this on the MDS OSC's */
4271 if (imp->imp_server_timeout) {
4272 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4274 spin_lock(&oscc->oscc_lock);
4275 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4276 spin_unlock(&oscc->oscc_lock);
4278 CDEBUG(D_INFO, "notify server \n");
4279 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4282 case IMP_EVENT_OCD: {
4283 struct obd_connect_data *ocd = &imp->imp_connect_data;
4285 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4286 osc_init_grant(&obd->u.cli, ocd);
4289 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4290 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4292 ptlrpc_import_setasync(imp, 1);
4293 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4297 CERROR("Unknown import event %d\n", event);
4303 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
4309 rc = ptlrpcd_addref();
4313 rc = client_obd_setup(obd, len, buf);
4317 struct lprocfs_static_vars lvars = { 0 };
4318 struct client_obd *cli = &obd->u.cli;
4320 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4321 lprocfs_osc_init_vars(&lvars);
4322 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4323 lproc_osc_attach_seqstat(obd);
4324 ptlrpc_lprocfs_register_obd(obd);
4328 /* We need to allocate a few requests more, because
4329 brw_interpret tries to create new requests before freeing
4330 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4331 reserved, but I afraid that might be too much wasted RAM
4332 in fact, so 2 is just my guess and still should work. */
4333 cli->cl_import->imp_rq_pool =
4334 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4336 ptlrpc_add_rqs_to_pool);
4337 cli->cl_cache = cache_create(obd);
4338 if (!cli->cl_cache) {
4342 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4343 sema_init(&cli->cl_grant_sem, 1);
4349 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4355 case OBD_CLEANUP_EARLY: {
4356 struct obd_import *imp;
4357 imp = obd->u.cli.cl_import;
4358 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4359 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4360 ptlrpc_deactivate_import(imp);
4363 case OBD_CLEANUP_EXPORTS: {
4364 /* If we set up but never connected, the
4365 client import will not have been cleaned. */
4366 down_write(&obd->u.cli.cl_sem);
4367 if (obd->u.cli.cl_import) {
4368 struct obd_import *imp;
4369 imp = obd->u.cli.cl_import;
4370 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4372 ptlrpc_invalidate_import(imp);
4373 if (imp->imp_rq_pool) {
4374 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4375 imp->imp_rq_pool = NULL;
4377 class_destroy_import(imp);
4378 obd->u.cli.cl_import = NULL;
4380 up_write(&obd->u.cli.cl_sem);
4382 rc = obd_llog_finish(obd, 0);
4384 CERROR("failed to cleanup llogging subsystems\n");
4387 case OBD_CLEANUP_SELF_EXP:
4389 case OBD_CLEANUP_OBD:
4395 int osc_cleanup(struct obd_device *obd)
4400 ptlrpc_lprocfs_unregister_obd(obd);
4401 lprocfs_obd_cleanup(obd);
4403 /* free memory of osc quota cache */
4404 lquota_cleanup(quota_interface, obd);
4406 cache_destroy(obd->u.cli.cl_cache);
4407 rc = client_obd_cleanup(obd);
4413 static int osc_register_page_removal_cb(struct obd_device *obd,
4414 obd_page_removal_cb_t func,
4415 obd_pin_extent_cb pin_cb)
4419 /* this server - not need init */
4423 return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4427 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4428 obd_page_removal_cb_t func)
4431 return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4434 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4435 obd_lock_cancel_cb cb)
4438 LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4440 /* this server - not need init */
4444 obd->u.cli.cl_ext_lock_cancel_cb = cb;
4448 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4449 obd_lock_cancel_cb cb)
4453 if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4454 CERROR("Unregistering cancel cb %p, while only %p was "
4456 obd->u.cli.cl_ext_lock_cancel_cb);
4460 obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4464 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4466 struct lustre_cfg *lcfg = buf;
4467 struct lprocfs_static_vars lvars = { 0 };
4470 lprocfs_osc_init_vars(&lvars);
4472 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4476 struct obd_ops osc_obd_ops = {
4477 .o_owner = THIS_MODULE,
4478 .o_setup = osc_setup,
4479 .o_precleanup = osc_precleanup,
4480 .o_cleanup = osc_cleanup,
4481 .o_add_conn = client_import_add_conn,
4482 .o_del_conn = client_import_del_conn,
4483 .o_connect = client_connect_import,
4484 .o_reconnect = osc_reconnect,
4485 .o_disconnect = osc_disconnect,
4486 .o_statfs = osc_statfs,
4487 .o_statfs_async = osc_statfs_async,
4488 .o_packmd = osc_packmd,
4489 .o_unpackmd = osc_unpackmd,
4490 .o_precreate = osc_precreate,
4491 .o_create = osc_create,
4492 .o_create_async = osc_create_async,
4493 .o_destroy = osc_destroy,
4494 .o_getattr = osc_getattr,
4495 .o_getattr_async = osc_getattr_async,
4496 .o_setattr = osc_setattr,
4497 .o_setattr_async = osc_setattr_async,
4499 .o_brw_async = osc_brw_async,
4500 .o_prep_async_page = osc_prep_async_page,
4501 .o_get_lock = osc_get_lock,
4502 .o_queue_async_io = osc_queue_async_io,
4503 .o_set_async_flags = osc_set_async_flags,
4504 .o_queue_group_io = osc_queue_group_io,
4505 .o_trigger_group_io = osc_trigger_group_io,
4506 .o_teardown_async_page = osc_teardown_async_page,
4507 .o_punch = osc_punch,
4509 .o_enqueue = osc_enqueue,
4510 .o_match = osc_match,
4511 .o_change_cbdata = osc_change_cbdata,
4512 .o_cancel = osc_cancel,
4513 .o_cancel_unused = osc_cancel_unused,
4514 .o_join_lru = osc_join_lru,
4515 .o_iocontrol = osc_iocontrol,
4516 .o_get_info = osc_get_info,
4517 .o_set_info_async = osc_set_info_async,
4518 .o_import_event = osc_import_event,
4519 .o_llog_init = osc_llog_init,
4520 .o_llog_finish = osc_llog_finish,
4521 .o_process_config = osc_process_config,
4522 .o_register_page_removal_cb = osc_register_page_removal_cb,
4523 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4524 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4525 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4527 int __init osc_init(void)
4529 struct lprocfs_static_vars lvars = { 0 };
4533 lprocfs_osc_init_vars(&lvars);
4535 request_module("lquota");
4536 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4537 lquota_init(quota_interface);
4538 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4540 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4543 if (quota_interface)
4544 PORTAL_SYMBOL_PUT(osc_quota_interface);
4548 osc_mds_ost_orig_logops = llog_lvfs_ops;
4549 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4550 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4551 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4552 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4558 static void /*__exit*/ osc_exit(void)
4560 lquota_exit(quota_interface);
4561 if (quota_interface)
4562 PORTAL_SYMBOL_PUT(osc_quota_interface);
4564 class_unregister_type(LUSTRE_OSC_NAME);
4567 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4568 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4569 MODULE_LICENSE("GPL");
4571 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);