1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <libcfs/libcfs.h>
45 # include <liblustre.h>
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(const struct lu_env *env,
72 struct ptlrpc_request *req, void *data, int rc);
73 int osc_cleanup(struct obd_device *obd);
75 /* Pack OSC object metadata for disk storage (LE byte order). */
76 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
77 struct lov_stripe_md *lsm)
82 lmm_size = sizeof(**lmmp);
87 OBD_FREE(*lmmp, lmm_size);
93 OBD_ALLOC(*lmmp, lmm_size);
99 LASSERT(lsm->lsm_object_id);
100 LASSERT(lsm->lsm_object_gr);
101 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
102 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
108 /* Unpack OSC object metadata from disk storage (LE byte order). */
109 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
110 struct lov_mds_md *lmm, int lmm_bytes)
116 if (lmm_bytes < sizeof (*lmm)) {
117 CERROR("lov_mds_md too small: %d, need %d\n",
118 lmm_bytes, (int)sizeof(*lmm));
121 /* XXX LOV_MAGIC etc check? */
123 if (lmm->lmm_object_id == 0) {
124 CERROR("lov_mds_md: zero lmm_object_id\n");
129 lsm_size = lov_stripe_md_size(1);
133 if (*lsmp != NULL && lmm == NULL) {
134 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
135 OBD_FREE(*lsmp, lsm_size);
141 OBD_ALLOC(*lsmp, lsm_size);
144 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
145 if ((*lsmp)->lsm_oinfo[0] == NULL) {
146 OBD_FREE(*lsmp, lsm_size);
149 loi_init((*lsmp)->lsm_oinfo[0]);
153 /* XXX zero *lsmp? */
154 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
155 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
156 LASSERT((*lsmp)->lsm_object_id);
157 LASSERT((*lsmp)->lsm_object_gr);
160 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166 struct ost_body *body, void *capa)
168 struct obd_capa *oc = (struct obd_capa *)capa;
169 struct lustre_capa *c;
174 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
177 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178 DEBUG_CAPA(D_SEC, c, "pack");
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182 struct obd_info *oinfo)
184 struct ost_body *body;
186 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
189 body->oa = *oinfo->oi_oa;
190 osc_pack_capa(req, body, oinfo->oi_capa);
193 static inline void osc_set_capa_size(struct ptlrpc_request *req,
194 const struct req_msg_field *field,
198 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
200 /* it is already calculated as sizeof struct obd_capa */
204 static int osc_getattr_interpret(const struct lu_env *env,
205 struct ptlrpc_request *req,
206 struct osc_async_args *aa, int rc)
208 struct ost_body *body;
214 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
215 lustre_swab_ost_body);
217 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
218 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
220 /* This should really be sent by the OST */
221 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
222 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
224 CDEBUG(D_INFO, "can't unpack ost_body\n");
226 aa->aa_oi->oi_oa->o_valid = 0;
229 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
233 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
234 struct ptlrpc_request_set *set)
236 struct ptlrpc_request *req;
237 struct osc_async_args *aa;
241 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
245 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
246 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
248 ptlrpc_request_free(req);
252 osc_pack_req_body(req, oinfo);
254 ptlrpc_request_set_replen(req);
255 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
257 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
258 aa = ptlrpc_req_async_args(req);
261 ptlrpc_set_add_req(set, req);
265 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
267 struct ptlrpc_request *req;
268 struct ost_body *body;
272 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
276 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
279 ptlrpc_request_free(req);
283 osc_pack_req_body(req, oinfo);
285 ptlrpc_request_set_replen(req);
287 rc = ptlrpc_queue_wait(req);
291 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
293 GOTO(out, rc = -EPROTO);
295 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296 *oinfo->oi_oa = body->oa;
298 /* This should really be sent by the OST */
299 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
300 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
304 ptlrpc_req_finished(req);
308 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
309 struct obd_trans_info *oti)
311 struct ptlrpc_request *req;
312 struct ost_body *body;
316 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
317 oinfo->oi_oa->o_gr > 0);
319 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
323 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
324 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326 ptlrpc_request_free(req);
330 osc_pack_req_body(req, oinfo);
332 ptlrpc_request_set_replen(req);
334 rc = ptlrpc_queue_wait(req);
338 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340 GOTO(out, rc = -EPROTO);
342 *oinfo->oi_oa = body->oa;
346 ptlrpc_req_finished(req);
350 static int osc_setattr_interpret(const struct lu_env *env,
351 struct ptlrpc_request *req,
352 struct osc_async_args *aa, int rc)
354 struct ost_body *body;
360 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362 GOTO(out, rc = -EPROTO);
364 *aa->aa_oi->oi_oa = body->oa;
366 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
370 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
371 struct obd_trans_info *oti,
372 struct ptlrpc_request_set *rqset)
374 struct ptlrpc_request *req;
375 struct osc_async_args *aa;
379 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
383 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
384 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386 ptlrpc_request_free(req);
390 osc_pack_req_body(req, oinfo);
392 ptlrpc_request_set_replen(req);
394 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
399 /* do mds to ost setattr asynchronously */
401 /* Do not wait for response. */
402 ptlrpcd_add_req(req);
404 req->rq_interpret_reply =
405 (ptlrpc_interpterer_t)osc_setattr_interpret;
407 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
408 aa = ptlrpc_req_async_args(req);
411 ptlrpc_set_add_req(rqset, req);
417 int osc_real_create(struct obd_export *exp, struct obdo *oa,
418 struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 struct ptlrpc_request *req;
421 struct ost_body *body;
422 struct lov_stripe_md *lsm;
431 rc = obd_alloc_memmd(exp, &lsm);
436 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438 GOTO(out, rc = -ENOMEM);
440 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442 ptlrpc_request_free(req);
446 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
450 ptlrpc_request_set_replen(req);
452 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
453 oa->o_flags == OBD_FL_DELORPHAN) {
455 "delorphan from OST integration");
456 /* Don't resend the delorphan req */
457 req->rq_no_resend = req->rq_no_delay = 1;
460 rc = ptlrpc_queue_wait(req);
464 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466 GOTO(out_req, rc = -EPROTO);
470 /* This should really be sent by the OST */
471 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
472 oa->o_valid |= OBD_MD_FLBLKSZ;
474 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
475 * have valid lsm_oinfo data structs, so don't go touching that.
476 * This needs to be fixed in a big way.
478 lsm->lsm_object_id = oa->o_id;
479 lsm->lsm_object_gr = oa->o_gr;
483 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485 if (oa->o_valid & OBD_MD_FLCOOKIE) {
486 if (!oti->oti_logcookies)
487 oti_alloc_cookies(oti, 1);
488 *oti->oti_logcookies = oa->o_lcookie;
492 CDEBUG(D_HA, "transno: "LPD64"\n",
493 lustre_msg_get_transno(req->rq_repmsg));
495 ptlrpc_req_finished(req);
498 obd_free_memmd(exp, &lsm);
502 static int osc_punch_interpret(const struct lu_env *env,
503 struct ptlrpc_request *req,
504 struct osc_async_args *aa, int rc)
506 struct ost_body *body;
512 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514 GOTO(out, rc = -EPROTO);
516 *aa->aa_oi->oi_oa = body->oa;
518 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
522 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
523 struct obd_trans_info *oti,
524 struct ptlrpc_request_set *rqset)
526 struct ptlrpc_request *req;
527 struct osc_async_args *aa;
528 struct ost_body *body;
533 CDEBUG(D_INFO, "oa NULL\n");
537 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
541 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
542 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
544 ptlrpc_request_free(req);
547 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
548 ptlrpc_at_set_req_timeout(req);
549 osc_pack_req_body(req, oinfo);
551 /* overload the size and blocks fields in the oa with start/end */
552 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
554 body->oa.o_size = oinfo->oi_policy.l_extent.start;
555 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
556 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
557 ptlrpc_request_set_replen(req);
560 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
561 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
562 aa = ptlrpc_req_async_args(req);
564 ptlrpc_set_add_req(rqset, req);
569 static int osc_sync(struct obd_export *exp, struct obdo *oa,
570 struct lov_stripe_md *md, obd_size start, obd_size end,
573 struct ptlrpc_request *req;
574 struct ost_body *body;
579 CDEBUG(D_INFO, "oa NULL\n");
583 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
587 osc_set_capa_size(req, &RMF_CAPA1, capa);
588 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
590 ptlrpc_request_free(req);
594 /* overload the size and blocks fields in the oa with start/end */
595 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
598 body->oa.o_size = start;
599 body->oa.o_blocks = end;
600 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
601 osc_pack_capa(req, body, capa);
603 ptlrpc_request_set_replen(req);
605 rc = ptlrpc_queue_wait(req);
609 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
611 GOTO(out, rc = -EPROTO);
617 ptlrpc_req_finished(req);
621 /* Find and cancel locally locks matched by @mode in the resource found by
622 * @objid. Found locks are added into @cancel list. Returns the amount of
623 * locks added to @cancels list. */
624 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
625 struct list_head *cancels, ldlm_mode_t mode,
628 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
629 struct ldlm_res_id res_id;
630 struct ldlm_resource *res;
634 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
635 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
639 LDLM_RESOURCE_ADDREF(res);
640 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
641 lock_flags, 0, NULL);
642 LDLM_RESOURCE_DELREF(res);
643 ldlm_resource_putref(res);
647 static int osc_destroy_interpret(const struct lu_env *env,
648 struct ptlrpc_request *req, void *data,
651 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
653 atomic_dec(&cli->cl_destroy_in_flight);
654 cfs_waitq_signal(&cli->cl_destroy_waitq);
658 static int osc_can_send_destroy(struct client_obd *cli)
660 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
661 cli->cl_max_rpcs_in_flight) {
662 /* The destroy request can be sent */
665 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
666 cli->cl_max_rpcs_in_flight) {
668 * The counter has been modified between the two atomic
671 cfs_waitq_signal(&cli->cl_destroy_waitq);
676 /* Destroy requests can be async always on the client, and we don't even really
677 * care about the return code since the client cannot do anything at all about
679 * When the MDS is unlinking a filename, it saves the file objects into a
680 * recovery llog, and these object records are cancelled when the OST reports
681 * they were destroyed and sync'd to disk (i.e. transaction committed).
682 * If the client dies, or the OST is down when the object should be destroyed,
683 * the records are not cancelled, and when the OST reconnects to the MDS next,
684 * it will retrieve the llog unlink logs and then sends the log cancellation
685 * cookies to the MDS after committing destroy transactions. */
686 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
687 struct lov_stripe_md *ea, struct obd_trans_info *oti,
688 struct obd_export *md_export)
690 struct client_obd *cli = &exp->exp_obd->u.cli;
691 struct ptlrpc_request *req;
692 struct ost_body *body;
693 CFS_LIST_HEAD(cancels);
698 CDEBUG(D_INFO, "oa NULL\n");
702 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
703 LDLM_FL_DISCARD_DATA);
705 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
707 ldlm_lock_list_put(&cancels, l_bl_ast, count);
711 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
714 ptlrpc_request_free(req);
718 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
719 req->rq_interpret_reply = osc_destroy_interpret;
720 ptlrpc_at_set_req_timeout(req);
722 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
723 oa->o_lcookie = *oti->oti_logcookies;
724 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
728 ptlrpc_request_set_replen(req);
730 if (!osc_can_send_destroy(cli)) {
731 struct l_wait_info lwi = { 0 };
734 * Wait until the number of on-going destroy RPCs drops
735 * under max_rpc_in_flight
737 l_wait_event_exclusive(cli->cl_destroy_waitq,
738 osc_can_send_destroy(cli), &lwi);
741 /* Do not wait for response */
742 ptlrpcd_add_req(req);
746 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
749 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
751 LASSERT(!(oa->o_valid & bits));
754 client_obd_list_lock(&cli->cl_loi_list_lock);
755 oa->o_dirty = cli->cl_dirty;
756 if (cli->cl_dirty > cli->cl_dirty_max) {
757 CERROR("dirty %lu > dirty_max %lu\n",
758 cli->cl_dirty, cli->cl_dirty_max);
760 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
761 CERROR("dirty %d > system dirty_max %d\n",
762 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
764 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
765 CERROR("dirty %lu - dirty_max %lu too big???\n",
766 cli->cl_dirty, cli->cl_dirty_max);
769 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
770 (cli->cl_max_rpcs_in_flight + 1);
771 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
773 oa->o_grant = cli->cl_avail_grant;
774 oa->o_dropped = cli->cl_lost_grant;
775 cli->cl_lost_grant = 0;
776 client_obd_list_unlock(&cli->cl_loi_list_lock);
777 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
778 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
781 /* caller must hold loi_list_lock */
782 static void osc_consume_write_grant(struct client_obd *cli,
783 struct brw_page *pga)
785 atomic_inc(&obd_dirty_pages);
786 cli->cl_dirty += CFS_PAGE_SIZE;
787 cli->cl_avail_grant -= CFS_PAGE_SIZE;
788 pga->flag |= OBD_BRW_FROM_GRANT;
789 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
790 CFS_PAGE_SIZE, pga, pga->pg);
791 LASSERT(cli->cl_avail_grant >= 0);
794 /* the companion to osc_consume_write_grant, called when a brw has completed.
795 * must be called with the loi lock held. */
796 static void osc_release_write_grant(struct client_obd *cli,
797 struct brw_page *pga, int sent)
799 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
802 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
807 pga->flag &= ~OBD_BRW_FROM_GRANT;
808 atomic_dec(&obd_dirty_pages);
809 cli->cl_dirty -= CFS_PAGE_SIZE;
811 cli->cl_lost_grant += CFS_PAGE_SIZE;
812 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
813 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
814 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
815 /* For short writes we shouldn't count parts of pages that
816 * span a whole block on the OST side, or our accounting goes
817 * wrong. Should match the code in filter_grant_check. */
818 int offset = pga->off & ~CFS_PAGE_MASK;
819 int count = pga->count + (offset & (blocksize - 1));
820 int end = (offset + pga->count) & (blocksize - 1);
822 count += blocksize - end;
824 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
825 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
826 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
827 cli->cl_avail_grant, cli->cl_dirty);
833 static unsigned long rpcs_in_flight(struct client_obd *cli)
835 return cli->cl_r_in_flight + cli->cl_w_in_flight;
838 /* caller must hold loi_list_lock */
839 void osc_wake_cache_waiters(struct client_obd *cli)
841 struct list_head *l, *tmp;
842 struct osc_cache_waiter *ocw;
845 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
846 /* if we can't dirty more, we must wait until some is written */
847 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
848 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
849 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
850 "osc max %ld, sys max %d\n", cli->cl_dirty,
851 cli->cl_dirty_max, obd_max_dirty_pages);
855 /* if still dirty cache but no grant wait for pending RPCs that
856 * may yet return us some grant before doing sync writes */
857 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
858 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
859 cli->cl_w_in_flight);
863 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
864 list_del_init(&ocw->ocw_entry);
865 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
866 /* no more RPCs in flight to return grant, do sync IO */
867 ocw->ocw_rc = -EDQUOT;
868 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
870 osc_consume_write_grant(cli,
871 &ocw->ocw_oap->oap_brw_page);
874 cfs_waitq_signal(&ocw->ocw_waitq);
880 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
882 client_obd_list_lock(&cli->cl_loi_list_lock);
883 cli->cl_avail_grant = ocd->ocd_grant;
884 client_obd_list_unlock(&cli->cl_loi_list_lock);
886 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
887 cli->cl_avail_grant, cli->cl_lost_grant);
888 LASSERT(cli->cl_avail_grant >= 0);
891 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
893 client_obd_list_lock(&cli->cl_loi_list_lock);
894 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
895 if (body->oa.o_valid & OBD_MD_FLGRANT)
896 cli->cl_avail_grant += body->oa.o_grant;
897 /* waiters are woken in brw_interpret */
898 client_obd_list_unlock(&cli->cl_loi_list_lock);
901 /* We assume that the reason this OSC got a short read is because it read
902 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
903 * via the LOV, and it _knows_ it's reading inside the file, it's just that
904 * this stripe never got written at or beyond this stripe offset yet. */
905 static void handle_short_read(int nob_read, obd_count page_count,
906 struct brw_page **pga)
911 /* skip bytes read OK */
912 while (nob_read > 0) {
913 LASSERT (page_count > 0);
915 if (pga[i]->count > nob_read) {
916 /* EOF inside this page */
917 ptr = cfs_kmap(pga[i]->pg) +
918 (pga[i]->off & ~CFS_PAGE_MASK);
919 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
920 cfs_kunmap(pga[i]->pg);
926 nob_read -= pga[i]->count;
931 /* zero remaining pages */
932 while (page_count-- > 0) {
933 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
934 memset(ptr, 0, pga[i]->count);
935 cfs_kunmap(pga[i]->pg);
940 static int check_write_rcs(struct ptlrpc_request *req,
941 int requested_nob, int niocount,
942 obd_count page_count, struct brw_page **pga)
946 /* return error if any niobuf was in error */
947 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
948 sizeof(*remote_rcs) * niocount, NULL);
949 if (remote_rcs == NULL) {
950 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
953 if (lustre_msg_swabbed(req->rq_repmsg))
954 for (i = 0; i < niocount; i++)
955 __swab32s(&remote_rcs[i]);
957 for (i = 0; i < niocount; i++) {
958 if (remote_rcs[i] < 0)
959 return(remote_rcs[i]);
961 if (remote_rcs[i] != 0) {
962 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
963 i, remote_rcs[i], req);
968 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
969 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
970 req->rq_bulk->bd_nob_transferred, requested_nob);
977 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
979 if (p1->flag != p2->flag) {
980 unsigned mask = ~OBD_BRW_FROM_GRANT;
982 /* warn if we try to combine flags that we don't know to be
984 if ((p1->flag & mask) != (p2->flag & mask))
985 CERROR("is it ok to have flags 0x%x and 0x%x in the "
986 "same brw?\n", p1->flag, p2->flag);
990 return (p1->off + p1->count == p2->off);
993 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
994 struct brw_page **pga, int opc,
995 cksum_type_t cksum_type)
1000 LASSERT (pg_count > 0);
1001 cksum = init_checksum(cksum_type);
1002 while (nob > 0 && pg_count > 0) {
1003 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1004 int off = pga[i]->off & ~CFS_PAGE_MASK;
1005 int count = pga[i]->count > nob ? nob : pga[i]->count;
1007 /* corrupt the data before we compute the checksum, to
1008 * simulate an OST->client data error */
1009 if (i == 0 && opc == OST_READ &&
1010 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1011 memcpy(ptr + off, "bad1", min(4, nob));
1012 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1013 cfs_kunmap(pga[i]->pg);
1014 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1017 nob -= pga[i]->count;
1021 /* For sending we only compute the wrong checksum instead
1022 * of corrupting the data so it is still correct on a redo */
1023 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1029 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1030 struct lov_stripe_md *lsm, obd_count page_count,
1031 struct brw_page **pga,
1032 struct ptlrpc_request **reqp,
1033 struct obd_capa *ocapa)
1035 struct ptlrpc_request *req;
1036 struct ptlrpc_bulk_desc *desc;
1037 struct ost_body *body;
1038 struct obd_ioobj *ioobj;
1039 struct niobuf_remote *niobuf;
1040 int niocount, i, requested_nob, opc, rc;
1041 struct osc_brw_async_args *aa;
1042 struct req_capsule *pill;
1043 struct brw_page *pg_prev;
1046 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1047 RETURN(-ENOMEM); /* Recoverable */
1048 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1049 RETURN(-EINVAL); /* Fatal */
1051 if ((cmd & OBD_BRW_WRITE) != 0) {
1053 req = ptlrpc_request_alloc_pool(cli->cl_import,
1054 cli->cl_import->imp_rq_pool,
1058 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1064 for (niocount = i = 1; i < page_count; i++) {
1065 if (!can_merge_pages(pga[i - 1], pga[i]))
1069 pill = &req->rq_pill;
1070 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1071 niocount * sizeof(*niobuf));
1072 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1074 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1076 ptlrpc_request_free(req);
1079 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1080 ptlrpc_at_set_req_timeout(req);
1082 if (opc == OST_WRITE)
1083 desc = ptlrpc_prep_bulk_imp(req, page_count,
1084 BULK_GET_SOURCE, OST_BULK_PORTAL);
1086 desc = ptlrpc_prep_bulk_imp(req, page_count,
1087 BULK_PUT_SINK, OST_BULK_PORTAL);
1090 GOTO(out, rc = -ENOMEM);
1091 /* NB request now owns desc and will free it when it gets freed */
1093 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1094 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1095 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1096 LASSERT(body && ioobj && niobuf);
1100 obdo_to_ioobj(oa, ioobj);
1101 ioobj->ioo_bufcnt = niocount;
1102 osc_pack_capa(req, body, ocapa);
1103 LASSERT (page_count > 0);
1105 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1106 struct brw_page *pg = pga[i];
1108 LASSERT(pg->count > 0);
1109 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1110 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1111 pg->off, pg->count);
1113 LASSERTF(i == 0 || pg->off > pg_prev->off,
1114 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1115 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1117 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1118 pg_prev->pg, page_private(pg_prev->pg),
1119 pg_prev->pg->index, pg_prev->off);
1121 LASSERTF(i == 0 || pg->off > pg_prev->off,
1122 "i %d p_c %u\n", i, page_count);
1124 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1125 (pg->flag & OBD_BRW_SRVLOCK));
1127 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1129 requested_nob += pg->count;
1131 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1133 niobuf->len += pg->count;
1135 niobuf->offset = pg->off;
1136 niobuf->len = pg->count;
1137 niobuf->flags = pg->flag;
1142 LASSERTF((void *)(niobuf - niocount) ==
1143 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1144 niocount * sizeof(*niobuf)),
1145 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1146 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1147 (void *)(niobuf - niocount));
1149 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1151 /* size[REQ_REC_OFF] still sizeof (*body) */
1152 if (opc == OST_WRITE) {
1153 if (unlikely(cli->cl_checksum) &&
1154 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1155 /* store cl_cksum_type in a local variable since
1156 * it can be changed via lprocfs */
1157 cksum_type_t cksum_type = cli->cl_cksum_type;
1159 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1160 oa->o_flags = body->oa.o_flags = 0;
1161 body->oa.o_flags |= cksum_type_pack(cksum_type);
1162 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1163 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1167 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1169 /* save this in 'oa', too, for later checking */
1170 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1171 oa->o_flags |= cksum_type_pack(cksum_type);
1173 /* clear out the checksum flag, in case this is a
1174 * resend but cl_checksum is no longer set. b=11238 */
1175 oa->o_valid &= ~OBD_MD_FLCKSUM;
1177 oa->o_cksum = body->oa.o_cksum;
1178 /* 1 RC per niobuf */
1179 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1180 sizeof(__u32) * niocount);
1182 if (unlikely(cli->cl_checksum) &&
1183 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1184 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1185 body->oa.o_flags = 0;
1186 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1187 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1189 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1190 /* 1 RC for the whole I/O */
1192 ptlrpc_request_set_replen(req);
1194 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1195 aa = ptlrpc_req_async_args(req);
1197 aa->aa_requested_nob = requested_nob;
1198 aa->aa_nio_count = niocount;
1199 aa->aa_page_count = page_count;
1203 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1209 ptlrpc_req_finished(req);
1213 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1214 __u32 client_cksum, __u32 server_cksum, int nob,
1215 obd_count page_count, struct brw_page **pga,
1216 cksum_type_t client_cksum_type)
1220 cksum_type_t cksum_type;
1222 if (server_cksum == client_cksum) {
1223 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1227 if (oa->o_valid & OBD_MD_FLFLAGS)
1228 cksum_type = cksum_type_unpack(oa->o_flags);
1230 cksum_type = OBD_CKSUM_CRC32;
1232 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1235 if (cksum_type != client_cksum_type)
1236 msg = "the server did not use the checksum type specified in "
1237 "the original request - likely a protocol problem";
1238 else if (new_cksum == server_cksum)
1239 msg = "changed on the client after we checksummed it - "
1240 "likely false positive due to mmap IO (bug 11742)";
1241 else if (new_cksum == client_cksum)
1242 msg = "changed in transit before arrival at OST";
1244 msg = "changed in transit AND doesn't match the original - "
1245 "likely false positive due to mmap IO (bug 11742)";
1247 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1248 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1249 "["LPU64"-"LPU64"]\n",
1250 msg, libcfs_nid2str(peer->nid),
1251 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1252 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1255 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1257 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1258 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1259 "client csum now %x\n", client_cksum, client_cksum_type,
1260 server_cksum, cksum_type, new_cksum);
1264 /* Note rc enters this function as number of bytes transferred */
1265 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1267 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1268 const lnet_process_id_t *peer =
1269 &req->rq_import->imp_connection->c_peer;
1270 struct client_obd *cli = aa->aa_cli;
1271 struct ost_body *body;
1272 __u32 client_cksum = 0;
1275 if (rc < 0 && rc != -EDQUOT)
1278 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1279 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1280 lustre_swab_ost_body);
1282 CDEBUG(D_INFO, "Can't unpack body\n");
1286 /* set/clear over quota flag for a uid/gid */
1287 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1288 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1289 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1290 body->oa.o_gid, body->oa.o_valid,
1296 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1297 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1299 osc_update_grant(cli, body);
1301 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1303 CERROR("Unexpected +ve rc %d\n", rc);
1306 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1308 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1309 check_write_checksum(&body->oa, peer, client_cksum,
1310 body->oa.o_cksum, aa->aa_requested_nob,
1311 aa->aa_page_count, aa->aa_ppga,
1312 cksum_type_unpack(aa->aa_oa->o_flags)))
1315 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1318 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1319 aa->aa_page_count, aa->aa_ppga);
1323 /* The rest of this function executes only for OST_READs */
1324 if (rc > aa->aa_requested_nob) {
1325 CERROR("Unexpected rc %d (%d requested)\n", rc,
1326 aa->aa_requested_nob);
1330 if (rc != req->rq_bulk->bd_nob_transferred) {
1331 CERROR ("Unexpected rc %d (%d transferred)\n",
1332 rc, req->rq_bulk->bd_nob_transferred);
1336 if (rc < aa->aa_requested_nob)
1337 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1339 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1341 GOTO(out, rc = -EAGAIN);
1343 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1344 static int cksum_counter;
1345 __u32 server_cksum = body->oa.o_cksum;
1348 cksum_type_t cksum_type;
1350 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1351 cksum_type = cksum_type_unpack(body->oa.o_flags);
1353 cksum_type = OBD_CKSUM_CRC32;
1354 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1355 aa->aa_ppga, OST_READ,
1358 if (peer->nid == req->rq_bulk->bd_sender) {
1362 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1365 if (server_cksum == ~0 && rc > 0) {
1366 CERROR("Protocol error: server %s set the 'checksum' "
1367 "bit, but didn't send a checksum. Not fatal, "
1368 "but please notify on http://bugzilla.lustre.org/\n",
1369 libcfs_nid2str(peer->nid));
1370 } else if (server_cksum != client_cksum) {
1371 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1372 "%s%s%s inum "LPU64"/"LPU64" object "
1373 LPU64"/"LPU64" extent "
1374 "["LPU64"-"LPU64"]\n",
1375 req->rq_import->imp_obd->obd_name,
1376 libcfs_nid2str(peer->nid),
1378 body->oa.o_valid & OBD_MD_FLFID ?
1379 body->oa.o_fid : (__u64)0,
1380 body->oa.o_valid & OBD_MD_FLFID ?
1381 body->oa.o_generation :(__u64)0,
1383 body->oa.o_valid & OBD_MD_FLGROUP ?
1384 body->oa.o_gr : (__u64)0,
1385 aa->aa_ppga[0]->off,
1386 aa->aa_ppga[aa->aa_page_count-1]->off +
1387 aa->aa_ppga[aa->aa_page_count-1]->count -
1389 CERROR("client %x, server %x, cksum_type %x\n",
1390 client_cksum, server_cksum, cksum_type);
1392 aa->aa_oa->o_cksum = client_cksum;
1396 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1399 } else if (unlikely(client_cksum)) {
1400 static int cksum_missed;
1403 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1404 CERROR("Checksum %u requested from %s but not sent\n",
1405 cksum_missed, libcfs_nid2str(peer->nid));
1411 *aa->aa_oa = body->oa;
1416 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1417 struct lov_stripe_md *lsm,
1418 obd_count page_count, struct brw_page **pga,
1419 struct obd_capa *ocapa)
1421 struct ptlrpc_request *req;
1425 struct l_wait_info lwi;
1429 cfs_waitq_init(&waitq);
1432 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1433 page_count, pga, &req, ocapa);
1437 rc = ptlrpc_queue_wait(req);
1439 if (rc == -ETIMEDOUT && req->rq_resend) {
1440 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1441 ptlrpc_req_finished(req);
1445 rc = osc_brw_fini_request(req, rc);
1447 ptlrpc_req_finished(req);
1448 if (osc_recoverable_error(rc)) {
1450 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1451 CERROR("too many resend retries, returning error\n");
1455 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1456 l_wait_event(waitq, 0, &lwi);
1464 int osc_brw_redo_request(struct ptlrpc_request *request,
1465 struct osc_brw_async_args *aa)
1467 struct ptlrpc_request *new_req;
1468 struct ptlrpc_request_set *set = request->rq_set;
1469 struct osc_brw_async_args *new_aa;
1470 struct osc_async_page *oap;
1474 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1475 CERROR("too many resend retries, returning error\n");
1479 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1481 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1482 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1483 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1486 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1487 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1488 aa->aa_cli, aa->aa_oa,
1489 NULL /* lsm unused by osc currently */,
1490 aa->aa_page_count, aa->aa_ppga,
1491 &new_req, NULL /* ocapa */);
1495 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1497 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1498 if (oap->oap_request != NULL) {
1499 LASSERTF(request == oap->oap_request,
1500 "request %p != oap_request %p\n",
1501 request, oap->oap_request);
1502 if (oap->oap_interrupted) {
1503 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1504 ptlrpc_req_finished(new_req);
1509 /* New request takes over pga and oaps from old request.
1510 * Note that copying a list_head doesn't work, need to move it... */
1512 new_req->rq_interpret_reply = request->rq_interpret_reply;
1513 new_req->rq_async_args = request->rq_async_args;
1514 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1516 new_aa = ptlrpc_req_async_args(new_req);
1518 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1519 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1520 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1522 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1523 if (oap->oap_request) {
1524 ptlrpc_req_finished(oap->oap_request);
1525 oap->oap_request = ptlrpc_request_addref(new_req);
1529 /* use ptlrpc_set_add_req is safe because interpret functions work
1530 * in check_set context. only one way exist with access to request
1531 * from different thread got -EINTR - this way protected with
1532 * cl_loi_list_lock */
1533 ptlrpc_set_add_req(set, new_req);
1535 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1537 DEBUG_REQ(D_INFO, new_req, "new request");
1541 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1542 struct lov_stripe_md *lsm, obd_count page_count,
1543 struct brw_page **pga, struct ptlrpc_request_set *set,
1544 struct obd_capa *ocapa)
1546 struct ptlrpc_request *req;
1547 struct client_obd *cli = &exp->exp_obd->u.cli;
1549 struct osc_brw_async_args *aa;
1552 /* Consume write credits even if doing a sync write -
1553 * otherwise we may run out of space on OST due to grant. */
1554 if (cmd == OBD_BRW_WRITE) {
1555 spin_lock(&cli->cl_loi_list_lock);
1556 for (i = 0; i < page_count; i++) {
1557 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1558 osc_consume_write_grant(cli, pga[i]);
1560 spin_unlock(&cli->cl_loi_list_lock);
1563 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1566 aa = ptlrpc_req_async_args(req);
1567 if (cmd == OBD_BRW_READ) {
1568 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1569 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1571 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1572 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1573 cli->cl_w_in_flight);
1575 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
1577 LASSERT(list_empty(&aa->aa_oaps));
1579 req->rq_interpret_reply = brw_interpret;
1580 ptlrpc_set_add_req(set, req);
1581 client_obd_list_lock(&cli->cl_loi_list_lock);
1582 if (cmd == OBD_BRW_READ)
1583 cli->cl_r_in_flight++;
1585 cli->cl_w_in_flight++;
1586 client_obd_list_unlock(&cli->cl_loi_list_lock);
1587 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1588 } else if (cmd == OBD_BRW_WRITE) {
1589 client_obd_list_lock(&cli->cl_loi_list_lock);
1590 for (i = 0; i < page_count; i++)
1591 osc_release_write_grant(cli, pga[i], 0);
1592 osc_wake_cache_waiters(cli);
1593 client_obd_list_unlock(&cli->cl_loi_list_lock);
1599 * ugh, we want disk allocation on the target to happen in offset order. we'll
1600 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1601 * fine for our small page arrays and doesn't require allocation. its an
1602 * insertion sort that swaps elements that are strides apart, shrinking the
1603 * stride down until its '1' and the array is sorted.
1605 static void sort_brw_pages(struct brw_page **array, int num)
1608 struct brw_page *tmp;
1612 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1617 for (i = stride ; i < num ; i++) {
1620 while (j >= stride && array[j - stride]->off > tmp->off) {
1621 array[j] = array[j - stride];
1626 } while (stride > 1);
1629 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1635 LASSERT (pages > 0);
1636 offset = pg[i]->off & ~CFS_PAGE_MASK;
1640 if (pages == 0) /* that's all */
1643 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1644 return count; /* doesn't end on page boundary */
1647 offset = pg[i]->off & ~CFS_PAGE_MASK;
1648 if (offset != 0) /* doesn't start on page boundary */
1655 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1657 struct brw_page **ppga;
1660 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1664 for (i = 0; i < count; i++)
1669 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1671 LASSERT(ppga != NULL);
1672 OBD_FREE(ppga, sizeof(*ppga) * count);
1675 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1676 obd_count page_count, struct brw_page *pga,
1677 struct obd_trans_info *oti)
1679 struct obdo *saved_oa = NULL;
1680 struct brw_page **ppga, **orig;
1681 struct obd_import *imp = class_exp2cliimp(exp);
1682 struct client_obd *cli = &imp->imp_obd->u.cli;
1683 int rc, page_count_orig;
1686 if (cmd & OBD_BRW_CHECK) {
1687 /* The caller just wants to know if there's a chance that this
1688 * I/O can succeed */
1690 if (imp == NULL || imp->imp_invalid)
1695 /* test_brw with a failed create can trip this, maybe others. */
1696 LASSERT(cli->cl_max_pages_per_rpc);
1700 orig = ppga = osc_build_ppga(pga, page_count);
1703 page_count_orig = page_count;
1705 sort_brw_pages(ppga, page_count);
1706 while (page_count) {
1707 obd_count pages_per_brw;
1709 if (page_count > cli->cl_max_pages_per_rpc)
1710 pages_per_brw = cli->cl_max_pages_per_rpc;
1712 pages_per_brw = page_count;
1714 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1716 if (saved_oa != NULL) {
1717 /* restore previously saved oa */
1718 *oinfo->oi_oa = *saved_oa;
1719 } else if (page_count > pages_per_brw) {
1720 /* save a copy of oa (brw will clobber it) */
1721 OBDO_ALLOC(saved_oa);
1722 if (saved_oa == NULL)
1723 GOTO(out, rc = -ENOMEM);
1724 *saved_oa = *oinfo->oi_oa;
1727 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1728 pages_per_brw, ppga, oinfo->oi_capa);
1733 page_count -= pages_per_brw;
1734 ppga += pages_per_brw;
1738 osc_release_ppga(orig, page_count_orig);
1740 if (saved_oa != NULL)
1741 OBDO_FREE(saved_oa);
1746 static int osc_brw_async(int cmd, struct obd_export *exp,
1747 struct obd_info *oinfo, obd_count page_count,
1748 struct brw_page *pga, struct obd_trans_info *oti,
1749 struct ptlrpc_request_set *set)
1751 struct brw_page **ppga, **orig;
1752 struct client_obd *cli = &exp->exp_obd->u.cli;
1753 int page_count_orig;
1757 if (cmd & OBD_BRW_CHECK) {
1758 struct obd_import *imp = class_exp2cliimp(exp);
1759 /* The caller just wants to know if there's a chance that this
1760 * I/O can succeed */
1762 if (imp == NULL || imp->imp_invalid)
1767 orig = ppga = osc_build_ppga(pga, page_count);
1770 page_count_orig = page_count;
1772 sort_brw_pages(ppga, page_count);
1773 while (page_count) {
1774 struct brw_page **copy;
1775 obd_count pages_per_brw;
1777 pages_per_brw = min_t(obd_count, page_count,
1778 cli->cl_max_pages_per_rpc);
1780 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1782 /* use ppga only if single RPC is going to fly */
1783 if (pages_per_brw != page_count_orig || ppga != orig) {
1784 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1786 GOTO(out, rc = -ENOMEM);
1787 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1791 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1792 pages_per_brw, copy, set, oinfo->oi_capa);
1796 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1800 /* we passed it to async_internal() which is
1801 * now responsible for releasing memory */
1805 page_count -= pages_per_brw;
1806 ppga += pages_per_brw;
1810 osc_release_ppga(orig, page_count_orig);
1814 static void osc_check_rpcs(struct client_obd *cli);
1816 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1817 * the dirty accounting. Writeback completes or truncate happens before
1818 * writing starts. Must be called with the loi lock held. */
1819 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1822 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1826 /* This maintains the lists of pending pages to read/write for a given object
1827 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1828 * to quickly find objects that are ready to send an RPC. */
1829 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1835 if (lop->lop_num_pending == 0)
1838 /* if we have an invalid import we want to drain the queued pages
1839 * by forcing them through rpcs that immediately fail and complete
1840 * the pages. recovery relies on this to empty the queued pages
1841 * before canceling the locks and evicting down the llite pages */
1842 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1845 /* stream rpcs in queue order as long as as there is an urgent page
1846 * queued. this is our cheap solution for good batching in the case
1847 * where writepage marks some random page in the middle of the file
1848 * as urgent because of, say, memory pressure */
1849 if (!list_empty(&lop->lop_urgent)) {
1850 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1853 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1854 optimal = cli->cl_max_pages_per_rpc;
1855 if (cmd & OBD_BRW_WRITE) {
1856 /* trigger a write rpc stream as long as there are dirtiers
1857 * waiting for space. as they're waiting, they're not going to
1858 * create more pages to coallesce with what's waiting.. */
1859 if (!list_empty(&cli->cl_cache_waiters)) {
1860 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1863 /* +16 to avoid triggering rpcs that would want to include pages
1864 * that are being queued but which can't be made ready until
1865 * the queuer finishes with the page. this is a wart for
1866 * llite::commit_write() */
1869 if (lop->lop_num_pending >= optimal)
1875 static void on_list(struct list_head *item, struct list_head *list,
1878 if (list_empty(item) && should_be_on)
1879 list_add_tail(item, list);
1880 else if (!list_empty(item) && !should_be_on)
1881 list_del_init(item);
1884 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1885 * can find pages to build into rpcs quickly */
1886 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1888 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1889 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1890 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1892 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1893 loi->loi_write_lop.lop_num_pending);
1895 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1896 loi->loi_read_lop.lop_num_pending);
1899 static void lop_update_pending(struct client_obd *cli,
1900 struct loi_oap_pages *lop, int cmd, int delta)
1902 lop->lop_num_pending += delta;
1903 if (cmd & OBD_BRW_WRITE)
1904 cli->cl_pending_w_pages += delta;
1906 cli->cl_pending_r_pages += delta;
1909 /* this is called when a sync waiter receives an interruption. Its job is to
1910 * get the caller woken as soon as possible. If its page hasn't been put in an
1911 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1912 * desiring interruption which will forcefully complete the rpc once the rpc
1914 static void osc_occ_interrupted(struct oig_callback_context *occ)
1916 struct osc_async_page *oap;
1917 struct loi_oap_pages *lop;
1918 struct lov_oinfo *loi;
1921 /* XXX member_of() */
1922 oap = list_entry(occ, struct osc_async_page, oap_occ);
1924 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1926 oap->oap_interrupted = 1;
1928 /* ok, it's been put in an rpc. only one oap gets a request reference */
1929 if (oap->oap_request != NULL) {
1930 ptlrpc_mark_interrupted(oap->oap_request);
1931 ptlrpcd_wake(oap->oap_request);
1935 /* we don't get interruption callbacks until osc_trigger_group_io()
1936 * has been called and put the sync oaps in the pending/urgent lists.*/
1937 if (!list_empty(&oap->oap_pending_item)) {
1938 list_del_init(&oap->oap_pending_item);
1939 list_del_init(&oap->oap_urgent_item);
1942 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1943 &loi->loi_write_lop : &loi->loi_read_lop;
1944 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1945 loi_list_maint(oap->oap_cli, oap->oap_loi);
1947 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1948 oap->oap_oig = NULL;
1952 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1955 /* this is trying to propogate async writeback errors back up to the
1956 * application. As an async write fails we record the error code for later if
1957 * the app does an fsync. As long as errors persist we force future rpcs to be
1958 * sync so that the app can get a sync error and break the cycle of queueing
1959 * pages for which writeback will fail. */
1960 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1967 ar->ar_force_sync = 1;
1968 ar->ar_min_xid = ptlrpc_sample_next_xid();
1973 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1974 ar->ar_force_sync = 0;
1977 static void osc_oap_to_pending(struct osc_async_page *oap)
1979 struct loi_oap_pages *lop;
1981 if (oap->oap_cmd & OBD_BRW_WRITE)
1982 lop = &oap->oap_loi->loi_write_lop;
1984 lop = &oap->oap_loi->loi_read_lop;
1986 if (oap->oap_async_flags & ASYNC_URGENT)
1987 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1988 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1989 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1992 /* this must be called holding the loi list lock to give coverage to exit_cache,
1993 * async_flag maintenance, and oap_request */
1994 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1995 struct osc_async_page *oap, int sent, int rc)
2000 if (oap->oap_request != NULL) {
2001 xid = ptlrpc_req_xid(oap->oap_request);
2002 ptlrpc_req_finished(oap->oap_request);
2003 oap->oap_request = NULL;
2006 oap->oap_async_flags = 0;
2007 oap->oap_interrupted = 0;
2009 if (oap->oap_cmd & OBD_BRW_WRITE) {
2010 osc_process_ar(&cli->cl_ar, xid, rc);
2011 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2014 if (rc == 0 && oa != NULL) {
2015 if (oa->o_valid & OBD_MD_FLBLOCKS)
2016 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2017 if (oa->o_valid & OBD_MD_FLMTIME)
2018 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2019 if (oa->o_valid & OBD_MD_FLATIME)
2020 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2021 if (oa->o_valid & OBD_MD_FLCTIME)
2022 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2026 osc_exit_cache(cli, oap, sent);
2027 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2028 oap->oap_oig = NULL;
2033 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2034 oap->oap_cmd, oa, rc);
2036 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2037 * I/O on the page could start, but OSC calls it under lock
2038 * and thus we can add oap back to pending safely */
2040 /* upper layer wants to leave the page on pending queue */
2041 osc_oap_to_pending(oap);
2043 osc_exit_cache(cli, oap, sent);
2047 static int brw_interpret(const struct lu_env *env,
2048 struct ptlrpc_request *req, void *data, int rc)
2050 struct osc_brw_async_args *aa = data;
2051 struct client_obd *cli;
2054 rc = osc_brw_fini_request(req, rc);
2055 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2056 if (osc_recoverable_error(rc)) {
2057 rc = osc_brw_redo_request(req, aa);
2064 client_obd_list_lock(&cli->cl_loi_list_lock);
2066 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2067 * is called so we know whether to go to sync BRWs or wait for more
2068 * RPCs to complete */
2069 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2070 cli->cl_w_in_flight--;
2072 cli->cl_r_in_flight--;
2074 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2075 struct osc_async_page *oap, *tmp;
2076 /* the caller may re-use the oap after the completion call so
2077 * we need to clean it up a little */
2078 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2079 list_del_init(&oap->oap_rpc_item);
2080 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2082 OBDO_FREE(aa->aa_oa);
2083 } else { /* from async_internal() */
2085 for (i = 0; i < aa->aa_page_count; i++)
2086 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2088 osc_wake_cache_waiters(cli);
2089 osc_check_rpcs(cli);
2090 client_obd_list_unlock(&cli->cl_loi_list_lock);
2092 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2096 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2097 struct list_head *rpc_list,
2098 int page_count, int cmd)
2100 struct ptlrpc_request *req;
2101 struct brw_page **pga = NULL;
2102 struct osc_brw_async_args *aa;
2103 struct obdo *oa = NULL;
2104 struct obd_async_page_ops *ops = NULL;
2105 void *caller_data = NULL;
2106 struct obd_capa *ocapa;
2107 struct osc_async_page *oap;
2108 struct ldlm_lock *lock = NULL;
2112 LASSERT(!list_empty(rpc_list));
2114 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2116 RETURN(ERR_PTR(-ENOMEM));
2120 GOTO(out, req = ERR_PTR(-ENOMEM));
2123 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2125 ops = oap->oap_caller_ops;
2126 caller_data = oap->oap_caller_data;
2127 lock = oap->oap_ldlm_lock;
2129 pga[i] = &oap->oap_brw_page;
2130 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2131 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2132 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2136 /* always get the data for the obdo for the rpc */
2137 LASSERT(ops != NULL);
2138 ops->ap_fill_obdo(caller_data, cmd, oa);
2139 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2141 oa->o_handle = lock->l_remote_handle;
2142 oa->o_valid |= OBD_MD_FLHANDLE;
2145 sort_brw_pages(pga, page_count);
2146 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2150 CERROR("prep_req failed: %d\n", rc);
2151 GOTO(out, req = ERR_PTR(rc));
2154 /* Need to update the timestamps after the request is built in case
2155 * we race with setattr (locally or in queue at OST). If OST gets
2156 * later setattr before earlier BRW (as determined by the request xid),
2157 * the OST will not use BRW timestamps. Sadly, there is no obvious
2158 * way to do this in a single call. bug 10150 */
2159 ops->ap_update_obdo(caller_data, cmd, oa,
2160 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2162 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2163 aa = ptlrpc_req_async_args(req);
2164 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2165 list_splice(rpc_list, &aa->aa_oaps);
2166 CFS_INIT_LIST_HEAD(rpc_list);
2173 OBD_FREE(pga, sizeof(*pga) * page_count);
2178 /* the loi lock is held across this function but it's allowed to release
2179 * and reacquire it during its work */
2181 * prepare pages for ASYNC io and put pages in send queue.
2185 * \param cmd - OBD_BRW_* macroses
2186 * \param lop - pending pages
2188 * \return zero if pages successfully add to send queue.
2189 * \return not zere if error occurring.
2191 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2192 int cmd, struct loi_oap_pages *lop)
2194 struct ptlrpc_request *req;
2195 obd_count page_count = 0;
2196 struct osc_async_page *oap = NULL, *tmp;
2197 struct osc_brw_async_args *aa;
2198 struct obd_async_page_ops *ops;
2199 CFS_LIST_HEAD(rpc_list);
2200 unsigned int ending_offset;
2201 unsigned starting_offset = 0;
2205 /* first we find the pages we're allowed to work with */
2206 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2208 ops = oap->oap_caller_ops;
2210 LASSERT(oap->oap_magic == OAP_MAGIC);
2212 if (page_count != 0 &&
2213 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2214 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2215 " oap %p, page %p, srvlock %u\n",
2216 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2219 /* in llite being 'ready' equates to the page being locked
2220 * until completion unlocks it. commit_write submits a page
2221 * as not ready because its unlock will happen unconditionally
2222 * as the call returns. if we race with commit_write giving
2223 * us that page we dont' want to create a hole in the page
2224 * stream, so we stop and leave the rpc to be fired by
2225 * another dirtier or kupdated interval (the not ready page
2226 * will still be on the dirty list). we could call in
2227 * at the end of ll_file_write to process the queue again. */
2228 if (!(oap->oap_async_flags & ASYNC_READY)) {
2229 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2231 CDEBUG(D_INODE, "oap %p page %p returned %d "
2232 "instead of ready\n", oap,
2236 /* llite is telling us that the page is still
2237 * in commit_write and that we should try
2238 * and put it in an rpc again later. we
2239 * break out of the loop so we don't create
2240 * a hole in the sequence of pages in the rpc
2245 /* the io isn't needed.. tell the checks
2246 * below to complete the rpc with EINTR */
2247 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2248 oap->oap_count = -EINTR;
2251 oap->oap_async_flags |= ASYNC_READY;
2254 LASSERTF(0, "oap %p page %p returned %d "
2255 "from make_ready\n", oap,
2263 * Page submitted for IO has to be locked. Either by
2264 * ->ap_make_ready() or by higher layers.
2266 #if defined(__KERNEL__) && defined(__linux__)
2267 if(!(PageLocked(oap->oap_page) &&
2268 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2269 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2270 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2274 /* If there is a gap at the start of this page, it can't merge
2275 * with any previous page, so we'll hand the network a
2276 * "fragmented" page array that it can't transfer in 1 RDMA */
2277 if (page_count != 0 && oap->oap_page_off != 0)
2280 /* take the page out of our book-keeping */
2281 list_del_init(&oap->oap_pending_item);
2282 lop_update_pending(cli, lop, cmd, -1);
2283 list_del_init(&oap->oap_urgent_item);
2285 if (page_count == 0)
2286 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2287 (PTLRPC_MAX_BRW_SIZE - 1);
2289 /* ask the caller for the size of the io as the rpc leaves. */
2290 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2292 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2293 if (oap->oap_count <= 0) {
2294 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2296 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2300 /* now put the page back in our accounting */
2301 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2302 if (page_count == 0)
2303 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2304 if (++page_count >= cli->cl_max_pages_per_rpc)
2307 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2308 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2309 * have the same alignment as the initial writes that allocated
2310 * extents on the server. */
2311 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2312 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2313 if (ending_offset == 0)
2316 /* If there is a gap at the end of this page, it can't merge
2317 * with any subsequent pages, so we'll hand the network a
2318 * "fragmented" page array that it can't transfer in 1 RDMA */
2319 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2323 osc_wake_cache_waiters(cli);
2325 if (page_count == 0)
2328 loi_list_maint(cli, loi);
2330 client_obd_list_unlock(&cli->cl_loi_list_lock);
2332 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2334 /* this should happen rarely and is pretty bad, it makes the
2335 * pending list not follow the dirty order */
2336 client_obd_list_lock(&cli->cl_loi_list_lock);
2337 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2338 list_del_init(&oap->oap_rpc_item);
2340 /* queued sync pages can be torn down while the pages
2341 * were between the pending list and the rpc */
2342 if (oap->oap_interrupted) {
2343 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2344 osc_ap_completion(cli, NULL, oap, 0,
2348 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2350 loi_list_maint(cli, loi);
2351 RETURN(PTR_ERR(req));
2354 aa = ptlrpc_req_async_args(req);
2356 if (cmd == OBD_BRW_READ) {
2357 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2358 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2359 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2360 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2362 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2363 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2364 cli->cl_w_in_flight);
2365 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2366 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2368 ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2370 client_obd_list_lock(&cli->cl_loi_list_lock);
2372 if (cmd == OBD_BRW_READ)
2373 cli->cl_r_in_flight++;
2375 cli->cl_w_in_flight++;
2377 /* queued sync pages can be torn down while the pages
2378 * were between the pending list and the rpc */
2380 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2381 /* only one oap gets a request reference */
2384 if (oap->oap_interrupted && !req->rq_intr) {
2385 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2387 ptlrpc_mark_interrupted(req);
2391 tmp->oap_request = ptlrpc_request_addref(req);
2393 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2394 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2396 req->rq_interpret_reply = brw_interpret;
2397 ptlrpcd_add_req(req);
2401 #define LOI_DEBUG(LOI, STR, args...) \
2402 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2403 !list_empty(&(LOI)->loi_cli_item), \
2404 (LOI)->loi_write_lop.lop_num_pending, \
2405 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2406 (LOI)->loi_read_lop.lop_num_pending, \
2407 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2410 /* This is called by osc_check_rpcs() to find which objects have pages that
2411 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2412 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2415 /* first return all objects which we already know to have
2416 * pages ready to be stuffed into rpcs */
2417 if (!list_empty(&cli->cl_loi_ready_list))
2418 RETURN(list_entry(cli->cl_loi_ready_list.next,
2419 struct lov_oinfo, loi_cli_item));
2421 /* then if we have cache waiters, return all objects with queued
2422 * writes. This is especially important when many small files
2423 * have filled up the cache and not been fired into rpcs because
2424 * they don't pass the nr_pending/object threshhold */
2425 if (!list_empty(&cli->cl_cache_waiters) &&
2426 !list_empty(&cli->cl_loi_write_list))
2427 RETURN(list_entry(cli->cl_loi_write_list.next,
2428 struct lov_oinfo, loi_write_item));
2430 /* then return all queued objects when we have an invalid import
2431 * so that they get flushed */
2432 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2433 if (!list_empty(&cli->cl_loi_write_list))
2434 RETURN(list_entry(cli->cl_loi_write_list.next,
2435 struct lov_oinfo, loi_write_item));
2436 if (!list_empty(&cli->cl_loi_read_list))
2437 RETURN(list_entry(cli->cl_loi_read_list.next,
2438 struct lov_oinfo, loi_read_item));
2443 /* called with the loi list lock held */
2444 static void osc_check_rpcs(struct client_obd *cli)
2446 struct lov_oinfo *loi;
2447 int rc = 0, race_counter = 0;
2450 while ((loi = osc_next_loi(cli)) != NULL) {
2451 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2453 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2456 /* attempt some read/write balancing by alternating between
2457 * reads and writes in an object. The makes_rpc checks here
2458 * would be redundant if we were getting read/write work items
2459 * instead of objects. we don't want send_oap_rpc to drain a
2460 * partial read pending queue when we're given this object to
2461 * do io on writes while there are cache waiters */
2462 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2463 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2464 &loi->loi_write_lop);
2472 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2473 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2474 &loi->loi_read_lop);
2483 /* attempt some inter-object balancing by issueing rpcs
2484 * for each object in turn */
2485 if (!list_empty(&loi->loi_cli_item))
2486 list_del_init(&loi->loi_cli_item);
2487 if (!list_empty(&loi->loi_write_item))
2488 list_del_init(&loi->loi_write_item);
2489 if (!list_empty(&loi->loi_read_item))
2490 list_del_init(&loi->loi_read_item);
2492 loi_list_maint(cli, loi);
2494 /* send_oap_rpc fails with 0 when make_ready tells it to
2495 * back off. llite's make_ready does this when it tries
2496 * to lock a page queued for write that is already locked.
2497 * we want to try sending rpcs from many objects, but we
2498 * don't want to spin failing with 0. */
2499 if (race_counter == 10)
2505 /* we're trying to queue a page in the osc so we're subject to the
2506 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2507 * If the osc's queued pages are already at that limit, then we want to sleep
2508 * until there is space in the osc's queue for us. We also may be waiting for
2509 * write credits from the OST if there are RPCs in flight that may return some
2510 * before we fall back to sync writes.
2512 * We need this know our allocation was granted in the presence of signals */
2513 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2517 client_obd_list_lock(&cli->cl_loi_list_lock);
2518 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2519 client_obd_list_unlock(&cli->cl_loi_list_lock);
2523 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2524 * grant or cache space. */
2525 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2526 struct osc_async_page *oap)
2528 struct osc_cache_waiter ocw;
2529 struct l_wait_info lwi = { 0 };
2533 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2534 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2535 cli->cl_dirty_max, obd_max_dirty_pages,
2536 cli->cl_lost_grant, cli->cl_avail_grant);
2538 /* force the caller to try sync io. this can jump the list
2539 * of queued writes and create a discontiguous rpc stream */
2540 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2541 loi->loi_ar.ar_force_sync)
2544 /* Hopefully normal case - cache space and write credits available */
2545 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2546 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2547 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2548 /* account for ourselves */
2549 osc_consume_write_grant(cli, &oap->oap_brw_page);
2553 /* Make sure that there are write rpcs in flight to wait for. This
2554 * is a little silly as this object may not have any pending but
2555 * other objects sure might. */
2556 if (cli->cl_w_in_flight) {
2557 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2558 cfs_waitq_init(&ocw.ocw_waitq);
2562 loi_list_maint(cli, loi);
2563 osc_check_rpcs(cli);
2564 client_obd_list_unlock(&cli->cl_loi_list_lock);
2566 CDEBUG(D_CACHE, "sleeping for cache space\n");
2567 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2569 client_obd_list_lock(&cli->cl_loi_list_lock);
2570 if (!list_empty(&ocw.ocw_entry)) {
2571 list_del(&ocw.ocw_entry);
2581 * Checks if requested extent lock is compatible with a lock under the page.
2583 * Checks if the lock under \a page is compatible with a read or write lock
2584 * (specified by \a rw) for an extent [\a start , \a end].
2586 * \param exp osc export
2587 * \param lsm striping information for the file
2588 * \param res osc_async_page placeholder
2589 * \param rw OBD_BRW_READ if requested for reading,
2590 * OBD_BRW_WRITE if requested for writing
2591 * \param start start of the requested extent
2592 * \param end end of the requested extent
2593 * \param cookie transparent parameter for passing locking context
2595 * \post result == 1, *cookie == context, appropriate lock is referenced or
2598 * \retval 1 owned lock is reused for the request
2599 * \retval 0 no lock reused for the request
2601 * \see osc_release_short_lock
2603 static int osc_reget_short_lock(struct obd_export *exp,
2604 struct lov_stripe_md *lsm,
2606 obd_off start, obd_off end,
2609 struct osc_async_page *oap = *res;
2614 spin_lock(&oap->oap_lock);
2615 rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2616 start, end, cookie);
2617 spin_unlock(&oap->oap_lock);
2623 * Releases a reference to a lock taken in a "fast" way.
2625 * Releases a read or a write (specified by \a rw) lock
2626 * referenced by \a cookie.
2628 * \param exp osc export
2629 * \param lsm striping information for the file
2630 * \param end end of the locked extent
2631 * \param rw OBD_BRW_READ if requested for reading,
2632 * OBD_BRW_WRITE if requested for writing
2633 * \param cookie transparent parameter for passing locking context
2635 * \post appropriate lock is dereferenced
2637 * \see osc_reget_short_lock
2639 static int osc_release_short_lock(struct obd_export *exp,
2640 struct lov_stripe_md *lsm, obd_off end,
2641 void *cookie, int rw)
2644 ldlm_lock_fast_release(cookie, rw);
2645 /* no error could have happened at this layer */
2649 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2650 struct lov_oinfo *loi, cfs_page_t *page,
2651 obd_off offset, struct obd_async_page_ops *ops,
2652 void *data, void **res, int nocache,
2653 struct lustre_handle *lockh)
2655 struct osc_async_page *oap;
2656 struct ldlm_res_id oid;
2661 return size_round(sizeof(*oap));
2664 oap->oap_magic = OAP_MAGIC;
2665 oap->oap_cli = &exp->exp_obd->u.cli;
2668 oap->oap_caller_ops = ops;
2669 oap->oap_caller_data = data;
2671 oap->oap_page = page;
2672 oap->oap_obj_off = offset;
2674 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2675 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2676 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2677 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2679 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2681 spin_lock_init(&oap->oap_lock);
2683 /* If the page was marked as notcacheable - don't add to any locks */
2685 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2686 /* This is the only place where we can call cache_add_extent
2687 without oap_lock, because this page is locked now, and
2688 the lock we are adding it to is referenced, so cannot lose
2689 any pages either. */
2690 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2695 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2699 struct osc_async_page *oap_from_cookie(void *cookie)
2701 struct osc_async_page *oap = cookie;
2702 if (oap->oap_magic != OAP_MAGIC)
2703 return ERR_PTR(-EINVAL);
2707 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2708 struct lov_oinfo *loi, void *cookie,
2709 int cmd, obd_off off, int count,
2710 obd_flag brw_flags, enum async_flags async_flags)
2712 struct client_obd *cli = &exp->exp_obd->u.cli;
2713 struct osc_async_page *oap;
2717 oap = oap_from_cookie(cookie);
2719 RETURN(PTR_ERR(oap));
2721 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2724 if (!list_empty(&oap->oap_pending_item) ||
2725 !list_empty(&oap->oap_urgent_item) ||
2726 !list_empty(&oap->oap_rpc_item))
2729 /* check if the file's owner/group is over quota */
2730 #ifdef HAVE_QUOTA_SUPPORT
2731 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2732 struct obd_async_page_ops *ops;
2739 ops = oap->oap_caller_ops;
2740 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2741 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2752 loi = lsm->lsm_oinfo[0];
2754 client_obd_list_lock(&cli->cl_loi_list_lock);
2757 oap->oap_page_off = off;
2758 oap->oap_count = count;
2759 oap->oap_brw_flags = brw_flags;
2760 oap->oap_async_flags = async_flags;
2762 if (cmd & OBD_BRW_WRITE) {
2763 rc = osc_enter_cache(cli, loi, oap);
2765 client_obd_list_unlock(&cli->cl_loi_list_lock);
2770 osc_oap_to_pending(oap);
2771 loi_list_maint(cli, loi);
2773 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2776 osc_check_rpcs(cli);
2777 client_obd_list_unlock(&cli->cl_loi_list_lock);
2782 /* aka (~was & now & flag), but this is more clear :) */
2783 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2785 static int osc_set_async_flags(struct obd_export *exp,
2786 struct lov_stripe_md *lsm,
2787 struct lov_oinfo *loi, void *cookie,
2788 obd_flag async_flags)
2790 struct client_obd *cli = &exp->exp_obd->u.cli;
2791 struct loi_oap_pages *lop;
2792 struct osc_async_page *oap;
2796 oap = oap_from_cookie(cookie);
2798 RETURN(PTR_ERR(oap));
2801 * bug 7311: OST-side locking is only supported for liblustre for now
2802 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2803 * implementation has to handle case where OST-locked page was picked
2804 * up by, e.g., ->writepage().
2806 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2807 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2810 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2814 loi = lsm->lsm_oinfo[0];
2816 if (oap->oap_cmd & OBD_BRW_WRITE) {
2817 lop = &loi->loi_write_lop;
2819 lop = &loi->loi_read_lop;
2822 client_obd_list_lock(&cli->cl_loi_list_lock);
2824 if (list_empty(&oap->oap_pending_item))
2825 GOTO(out, rc = -EINVAL);
2827 if ((oap->oap_async_flags & async_flags) == async_flags)
2830 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2831 oap->oap_async_flags |= ASYNC_READY;
2833 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2834 if (list_empty(&oap->oap_rpc_item)) {
2835 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2836 loi_list_maint(cli, loi);
2840 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2841 oap->oap_async_flags);
2843 osc_check_rpcs(cli);
2844 client_obd_list_unlock(&cli->cl_loi_list_lock);
2848 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2849 struct lov_oinfo *loi,
2850 struct obd_io_group *oig, void *cookie,
2851 int cmd, obd_off off, int count,
2853 obd_flag async_flags)
2855 struct client_obd *cli = &exp->exp_obd->u.cli;
2856 struct osc_async_page *oap;
2857 struct loi_oap_pages *lop;
2861 oap = oap_from_cookie(cookie);
2863 RETURN(PTR_ERR(oap));
2865 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2868 if (!list_empty(&oap->oap_pending_item) ||
2869 !list_empty(&oap->oap_urgent_item) ||
2870 !list_empty(&oap->oap_rpc_item))
2874 loi = lsm->lsm_oinfo[0];
2876 client_obd_list_lock(&cli->cl_loi_list_lock);
2879 oap->oap_page_off = off;
2880 oap->oap_count = count;
2881 oap->oap_brw_flags = brw_flags;
2882 oap->oap_async_flags = async_flags;
2884 if (cmd & OBD_BRW_WRITE)
2885 lop = &loi->loi_write_lop;
2887 lop = &loi->loi_read_lop;
2889 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2890 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2892 rc = oig_add_one(oig, &oap->oap_occ);
2895 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2896 oap, oap->oap_page, rc);
2898 client_obd_list_unlock(&cli->cl_loi_list_lock);
2903 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2904 struct loi_oap_pages *lop, int cmd)
2906 struct list_head *pos, *tmp;
2907 struct osc_async_page *oap;
2909 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2910 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2911 list_del(&oap->oap_pending_item);
2912 osc_oap_to_pending(oap);
2914 loi_list_maint(cli, loi);
2917 static int osc_trigger_group_io(struct obd_export *exp,
2918 struct lov_stripe_md *lsm,
2919 struct lov_oinfo *loi,
2920 struct obd_io_group *oig)
2922 struct client_obd *cli = &exp->exp_obd->u.cli;
2926 loi = lsm->lsm_oinfo[0];
2928 client_obd_list_lock(&cli->cl_loi_list_lock);
2930 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2931 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2933 osc_check_rpcs(cli);
2934 client_obd_list_unlock(&cli->cl_loi_list_lock);
2939 static int osc_teardown_async_page(struct obd_export *exp,
2940 struct lov_stripe_md *lsm,
2941 struct lov_oinfo *loi, void *cookie)
2943 struct client_obd *cli = &exp->exp_obd->u.cli;
2944 struct loi_oap_pages *lop;
2945 struct osc_async_page *oap;
2949 oap = oap_from_cookie(cookie);
2951 RETURN(PTR_ERR(oap));
2954 loi = lsm->lsm_oinfo[0];
2956 if (oap->oap_cmd & OBD_BRW_WRITE) {
2957 lop = &loi->loi_write_lop;
2959 lop = &loi->loi_read_lop;
2962 client_obd_list_lock(&cli->cl_loi_list_lock);
2964 if (!list_empty(&oap->oap_rpc_item))
2965 GOTO(out, rc = -EBUSY);
2967 osc_exit_cache(cli, oap, 0);
2968 osc_wake_cache_waiters(cli);
2970 if (!list_empty(&oap->oap_urgent_item)) {
2971 list_del_init(&oap->oap_urgent_item);
2972 oap->oap_async_flags &= ~ASYNC_URGENT;
2974 if (!list_empty(&oap->oap_pending_item)) {
2975 list_del_init(&oap->oap_pending_item);
2976 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2978 loi_list_maint(cli, loi);
2979 cache_remove_extent(cli->cl_cache, oap);
2981 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2983 client_obd_list_unlock(&cli->cl_loi_list_lock);
2987 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2988 struct ldlm_lock_desc *new, void *data,
2991 struct lustre_handle lockh = { 0 };
2995 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2996 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
3001 case LDLM_CB_BLOCKING:
3002 ldlm_lock2handle(lock, &lockh);
3003 rc = ldlm_cli_cancel(&lockh);
3005 CERROR("ldlm_cli_cancel failed: %d\n", rc);
3007 case LDLM_CB_CANCELING: {
3009 ldlm_lock2handle(lock, &lockh);
3010 /* This lock wasn't granted, don't try to do anything */
3011 if (lock->l_req_mode != lock->l_granted_mode)
3014 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3017 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3018 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3019 lock, new, data,flag);
3028 EXPORT_SYMBOL(osc_extent_blocking_cb);
3030 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3033 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3036 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3039 lock_res_and_lock(lock);
3040 #if defined (__KERNEL__) && defined (__linux__)
3041 /* Liang XXX: Darwin and Winnt checking should be added */
3042 if (lock->l_ast_data && lock->l_ast_data != data) {
3043 struct inode *new_inode = data;
3044 struct inode *old_inode = lock->l_ast_data;
3045 if (!(old_inode->i_state & I_FREEING))
3046 LDLM_ERROR(lock, "inconsistent l_ast_data found");
3047 LASSERTF(old_inode->i_state & I_FREEING,
3048 "Found existing inode %p/%lu/%u state %lu in lock: "
3049 "setting data to %p/%lu/%u\n", old_inode,
3050 old_inode->i_ino, old_inode->i_generation,
3052 new_inode, new_inode->i_ino, new_inode->i_generation);
3055 lock->l_ast_data = data;
3056 unlock_res_and_lock(lock);
3057 LDLM_LOCK_PUT(lock);
3060 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3061 ldlm_iterator_t replace, void *data)
3063 struct ldlm_res_id res_id;
3064 struct obd_device *obd = class_exp2obd(exp);
3066 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3067 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3071 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3072 struct obd_info *oinfo, int intent, int rc)
3077 /* The request was created before ldlm_cli_enqueue call. */
3078 if (rc == ELDLM_LOCK_ABORTED) {
3079 struct ldlm_reply *rep;
3080 rep = req_capsule_server_get(&req->rq_pill,
3083 LASSERT(rep != NULL);
3084 if (rep->lock_policy_res1)
3085 rc = rep->lock_policy_res1;
3089 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3090 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3091 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3092 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3093 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3097 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3099 /* Call the update callback. */
3100 rc = oinfo->oi_cb_up(oinfo, rc);
3104 static int osc_enqueue_interpret(const struct lu_env *env,
3105 struct ptlrpc_request *req,
3106 struct osc_enqueue_args *aa, int rc)
3108 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3109 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3110 struct ldlm_lock *lock;
3112 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3114 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3116 /* Complete obtaining the lock procedure. */
3117 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3119 &aa->oa_oi->oi_flags,
3120 &lsm->lsm_oinfo[0]->loi_lvb,
3121 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3122 lustre_swab_ost_lvb,
3123 aa->oa_oi->oi_lockh, rc);
3125 /* Complete osc stuff. */
3126 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3128 /* Release the lock for async request. */
3129 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3130 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3132 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3133 aa->oa_oi->oi_lockh, req, aa);
3134 LDLM_LOCK_PUT(lock);
3138 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3139 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3140 * other synchronous requests, however keeping some locks and trying to obtain
3141 * others may take a considerable amount of time in a case of ost failure; and
3142 * when other sync requests do not get released lock from a client, the client
3143 * is excluded from the cluster -- such scenarious make the life difficult, so
3144 * release locks just after they are obtained. */
3145 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3146 struct ldlm_enqueue_info *einfo,
3147 struct ptlrpc_request_set *rqset)
3149 struct ldlm_res_id res_id;
3150 struct obd_device *obd = exp->exp_obd;
3151 struct ptlrpc_request *req = NULL;
3152 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3158 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3159 oinfo->oi_md->lsm_object_gr, &res_id);
3160 /* Filesystem lock extents are extended to page boundaries so that
3161 * dealing with the page cache is a little smoother. */
3162 oinfo->oi_policy.l_extent.start -=
3163 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3164 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3166 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3169 /* Next, search for already existing extent locks that will cover us */
3170 /* If we're trying to read, we also search for an existing PW lock. The
3171 * VFS and page cache already protect us locally, so lots of readers/
3172 * writers can share a single PW lock.
3174 * There are problems with conversion deadlocks, so instead of
3175 * converting a read lock to a write lock, we'll just enqueue a new
3178 * At some point we should cancel the read lock instead of making them
3179 * send us a blocking callback, but there are problems with canceling
3180 * locks out from other users right now, too. */
3181 mode = einfo->ei_mode;
3182 if (einfo->ei_mode == LCK_PR)
3184 mode = ldlm_lock_match(obd->obd_namespace,
3185 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3186 einfo->ei_type, &oinfo->oi_policy, mode,
3189 /* addref the lock only if not async requests and PW lock is
3190 * matched whereas we asked for PR. */
3191 if (!rqset && einfo->ei_mode != mode)
3192 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3193 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3196 /* I would like to be able to ASSERT here that rss <=
3197 * kms, but I can't, for reasons which are explained in
3201 /* We already have a lock, and it's referenced */
3202 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3204 /* For async requests, decref the lock. */
3205 if (einfo->ei_mode != mode)
3206 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3208 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3215 CFS_LIST_HEAD(cancels);
3216 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3217 &RQF_LDLM_ENQUEUE_LVB);
3221 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3225 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3226 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3227 ptlrpc_request_set_replen(req);
3230 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3231 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3233 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3234 &oinfo->oi_policy, &oinfo->oi_flags,
3235 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3236 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3237 lustre_swab_ost_lvb, oinfo->oi_lockh,
3241 struct osc_enqueue_args *aa;
3242 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3243 aa = ptlrpc_req_async_args(req);
3248 req->rq_interpret_reply =
3249 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3250 ptlrpc_set_add_req(rqset, req);
3251 } else if (intent) {
3252 ptlrpc_req_finished(req);
3257 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3259 ptlrpc_req_finished(req);
3264 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3265 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3266 int *flags, void *data, struct lustre_handle *lockh)
3268 struct ldlm_res_id res_id;
3269 struct obd_device *obd = exp->exp_obd;
3270 int lflags = *flags;
3274 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3276 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3279 /* Filesystem lock extents are extended to page boundaries so that
3280 * dealing with the page cache is a little smoother */
3281 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3282 policy->l_extent.end |= ~CFS_PAGE_MASK;
3284 /* Next, search for already existing extent locks that will cover us */
3285 /* If we're trying to read, we also search for an existing PW lock. The
3286 * VFS and page cache already protect us locally, so lots of readers/
3287 * writers can share a single PW lock. */
3291 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3292 &res_id, type, policy, rc, lockh);
3294 osc_set_data_with_check(lockh, data, lflags);
3295 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3296 ldlm_lock_addref(lockh, LCK_PR);
3297 ldlm_lock_decref(lockh, LCK_PW);
3304 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3305 __u32 mode, struct lustre_handle *lockh)
3309 if (unlikely(mode == LCK_GROUP))
3310 ldlm_lock_decref_and_cancel(lockh, mode);
3312 ldlm_lock_decref(lockh, mode);
3317 static int osc_cancel_unused(struct obd_export *exp,
3318 struct lov_stripe_md *lsm, int flags,
3321 struct obd_device *obd = class_exp2obd(exp);
3322 struct ldlm_res_id res_id, *resp = NULL;
3325 resp = osc_build_res_name(lsm->lsm_object_id,
3326 lsm->lsm_object_gr, &res_id);
3329 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3332 static int osc_statfs_interpret(const struct lu_env *env,
3333 struct ptlrpc_request *req,
3334 struct osc_async_args *aa, int rc)
3336 struct obd_statfs *msfs;
3342 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3344 GOTO(out, rc = -EPROTO);
3347 *aa->aa_oi->oi_osfs = *msfs;
3349 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3353 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3354 __u64 max_age, struct ptlrpc_request_set *rqset)
3356 struct ptlrpc_request *req;
3357 struct osc_async_args *aa;
3361 /* We could possibly pass max_age in the request (as an absolute
3362 * timestamp or a "seconds.usec ago") so the target can avoid doing
3363 * extra calls into the filesystem if that isn't necessary (e.g.
3364 * during mount that would help a bit). Having relative timestamps
3365 * is not so great if request processing is slow, while absolute
3366 * timestamps are not ideal because they need time synchronization. */
3367 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3371 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3373 ptlrpc_request_free(req);
3376 ptlrpc_request_set_replen(req);
3377 req->rq_request_portal = OST_CREATE_PORTAL;
3378 ptlrpc_at_set_req_timeout(req);
3380 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3381 /* procfs requests not want stat in wait for avoid deadlock */
3382 req->rq_no_resend = 1;
3383 req->rq_no_delay = 1;
3386 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3387 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3388 aa = ptlrpc_req_async_args(req);
3391 ptlrpc_set_add_req(rqset, req);
3395 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3396 __u64 max_age, __u32 flags)
3398 struct obd_statfs *msfs;
3399 struct ptlrpc_request *req;
3400 struct obd_import *imp = NULL;
3404 /*Since the request might also come from lprocfs, so we need
3405 *sync this with client_disconnect_export Bug15684*/
3406 down_read(&obd->u.cli.cl_sem);
3407 if (obd->u.cli.cl_import)
3408 imp = class_import_get(obd->u.cli.cl_import);
3409 up_read(&obd->u.cli.cl_sem);
3413 /* We could possibly pass max_age in the request (as an absolute
3414 * timestamp or a "seconds.usec ago") so the target can avoid doing
3415 * extra calls into the filesystem if that isn't necessary (e.g.
3416 * during mount that would help a bit). Having relative timestamps
3417 * is not so great if request processing is slow, while absolute
3418 * timestamps are not ideal because they need time synchronization. */
3419 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3421 class_import_put(imp);
3426 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3428 ptlrpc_request_free(req);
3431 ptlrpc_request_set_replen(req);
3432 req->rq_request_portal = OST_CREATE_PORTAL;
3433 ptlrpc_at_set_req_timeout(req);
3435 if (flags & OBD_STATFS_NODELAY) {
3436 /* procfs requests not want stat in wait for avoid deadlock */
3437 req->rq_no_resend = 1;
3438 req->rq_no_delay = 1;
3441 rc = ptlrpc_queue_wait(req);
3445 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3447 GOTO(out, rc = -EPROTO);
3454 ptlrpc_req_finished(req);
3458 /* Retrieve object striping information.
3460 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3461 * the maximum number of OST indices which will fit in the user buffer.
3462 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3464 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3466 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3467 struct lov_user_md_v3 lum, *lumk;
3468 struct lov_user_ost_data_v1 *lmm_objects;
3469 int rc = 0, lum_size;
3475 /* we only need the header part from user space to get lmm_magic and
3476 * lmm_stripe_count, (the header part is common to v1 and v3) */
3477 lum_size = sizeof(struct lov_user_md_v1);
3478 if (copy_from_user(&lum, lump, lum_size))
3481 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3482 (lum.lmm_magic != LOV_USER_MAGIC_V3))
3485 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3486 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3487 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3488 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3490 /* we can use lov_mds_md_size() to compute lum_size
3491 * because lov_user_md_vX and lov_mds_md_vX have the same size */
3492 if (lum.lmm_stripe_count > 0) {
3493 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3494 OBD_ALLOC(lumk, lum_size);
3498 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3499 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3501 lmm_objects = &(lumk->lmm_objects[0]);
3502 lmm_objects->l_object_id = lsm->lsm_object_id;
3504 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3508 lumk->lmm_object_id = lsm->lsm_object_id;
3509 lumk->lmm_object_gr = lsm->lsm_object_gr;
3510 lumk->lmm_stripe_count = 1;
3512 if (copy_to_user(lump, lumk, lum_size))
3516 OBD_FREE(lumk, lum_size);
3522 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3523 void *karg, void *uarg)
3525 struct obd_device *obd = exp->exp_obd;
3526 struct obd_ioctl_data *data = karg;
3530 if (!try_module_get(THIS_MODULE)) {
3531 CERROR("Can't get module. Is it alive?");
3535 case OBD_IOC_LOV_GET_CONFIG: {
3537 struct lov_desc *desc;
3538 struct obd_uuid uuid;
3542 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3543 GOTO(out, err = -EINVAL);
3545 data = (struct obd_ioctl_data *)buf;
3547 if (sizeof(*desc) > data->ioc_inllen1) {
3548 obd_ioctl_freedata(buf, len);
3549 GOTO(out, err = -EINVAL);
3552 if (data->ioc_inllen2 < sizeof(uuid)) {
3553 obd_ioctl_freedata(buf, len);
3554 GOTO(out, err = -EINVAL);
3557 desc = (struct lov_desc *)data->ioc_inlbuf1;
3558 desc->ld_tgt_count = 1;
3559 desc->ld_active_tgt_count = 1;
3560 desc->ld_default_stripe_count = 1;
3561 desc->ld_default_stripe_size = 0;
3562 desc->ld_default_stripe_offset = 0;
3563 desc->ld_pattern = 0;
3564 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3566 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3568 err = copy_to_user((void *)uarg, buf, len);
3571 obd_ioctl_freedata(buf, len);
3574 case LL_IOC_LOV_SETSTRIPE:
3575 err = obd_alloc_memmd(exp, karg);
3579 case LL_IOC_LOV_GETSTRIPE:
3580 err = osc_getstripe(karg, uarg);
3582 case OBD_IOC_CLIENT_RECOVER:
3583 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3588 case IOC_OSC_SET_ACTIVE:
3589 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3592 case OBD_IOC_POLL_QUOTACHECK:
3593 err = lquota_poll_check(quota_interface, exp,
3594 (struct if_quotacheck *)karg);
3597 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3598 cmd, cfs_curproc_comm());
3599 GOTO(out, err = -ENOTTY);
3602 module_put(THIS_MODULE);
3606 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3607 void *key, __u32 *vallen, void *val,
3608 struct lov_stripe_md *lsm)
3611 if (!vallen || !val)
3614 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3615 __u32 *stripe = val;
3616 *vallen = sizeof(*stripe);
3619 } else if (KEY_IS(KEY_LAST_ID)) {
3620 struct ptlrpc_request *req;
3625 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3626 &RQF_OST_GET_INFO_LAST_ID);
3630 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3631 RCL_CLIENT, keylen);
3632 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3634 ptlrpc_request_free(req);
3638 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3639 memcpy(tmp, key, keylen);
3641 ptlrpc_request_set_replen(req);
3642 rc = ptlrpc_queue_wait(req);
3646 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3648 GOTO(out, rc = -EPROTO);
3650 *((obd_id *)val) = *reply;
3652 ptlrpc_req_finished(req);
3654 } else if (KEY_IS(KEY_FIEMAP)) {
3655 struct ptlrpc_request *req;
3656 struct ll_user_fiemap *reply;
3660 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3661 &RQF_OST_GET_INFO_FIEMAP);
3665 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3666 RCL_CLIENT, keylen);
3667 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3668 RCL_CLIENT, *vallen);
3669 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3670 RCL_SERVER, *vallen);
3672 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3674 ptlrpc_request_free(req);
3678 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3679 memcpy(tmp, key, keylen);
3680 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3681 memcpy(tmp, val, *vallen);
3683 ptlrpc_request_set_replen(req);
3684 rc = ptlrpc_queue_wait(req);
3688 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3690 GOTO(out1, rc = -EPROTO);
3692 memcpy(val, reply, *vallen);
3694 ptlrpc_req_finished(req);
3702 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3703 struct ptlrpc_request *req,
3706 struct llog_ctxt *ctxt;
3707 struct obd_import *imp = req->rq_import;
3713 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3716 rc = llog_initiator_connect(ctxt);
3718 CERROR("cannot establish connection for "
3719 "ctxt %p: %d\n", ctxt, rc);
3722 llog_ctxt_put(ctxt);
3723 spin_lock(&imp->imp_lock);
3724 imp->imp_server_timeout = 1;
3725 imp->imp_pingable = 1;
3726 spin_unlock(&imp->imp_lock);
3727 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3732 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3733 void *key, obd_count vallen, void *val,
3734 struct ptlrpc_request_set *set)
3736 struct ptlrpc_request *req;
3737 struct obd_device *obd = exp->exp_obd;
3738 struct obd_import *imp = class_exp2cliimp(exp);
3743 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3745 if (KEY_IS(KEY_NEXT_ID)) {
3746 if (vallen != sizeof(obd_id))
3750 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3751 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3752 exp->exp_obd->obd_name,
3753 obd->u.cli.cl_oscc.oscc_next_id);
3758 if (KEY_IS(KEY_UNLINKED)) {
3759 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3760 spin_lock(&oscc->oscc_lock);
3761 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3762 spin_unlock(&oscc->oscc_lock);
3766 if (KEY_IS(KEY_INIT_RECOV)) {
3767 if (vallen != sizeof(int))
3769 spin_lock(&imp->imp_lock);
3770 imp->imp_initial_recov = *(int *)val;
3771 spin_unlock(&imp->imp_lock);
3772 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3773 exp->exp_obd->obd_name,
3774 imp->imp_initial_recov);
3778 if (KEY_IS(KEY_CHECKSUM)) {
3779 if (vallen != sizeof(int))
3781 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3785 if (KEY_IS(KEY_FLUSH_CTX)) {
3786 sptlrpc_import_flush_my_ctx(imp);
3793 /* We pass all other commands directly to OST. Since nobody calls osc
3794 methods directly and everybody is supposed to go through LOV, we
3795 assume lov checked invalid values for us.
3796 The only recognised values so far are evict_by_nid and mds_conn.
3797 Even if something bad goes through, we'd get a -EINVAL from OST
3801 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3805 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3806 RCL_CLIENT, keylen);
3807 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3808 RCL_CLIENT, vallen);
3809 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3811 ptlrpc_request_free(req);
3815 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3816 memcpy(tmp, key, keylen);
3817 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3818 memcpy(tmp, val, vallen);
3820 if (KEY_IS(KEY_MDS_CONN)) {
3821 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3823 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3824 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3825 LASSERT(oscc->oscc_oa.o_gr > 0);
3826 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3829 ptlrpc_request_set_replen(req);
3830 ptlrpc_set_add_req(set, req);
3831 ptlrpc_check_set(NULL, set);
3837 static struct llog_operations osc_size_repl_logops = {
3838 lop_cancel: llog_obd_repl_cancel
3841 static struct llog_operations osc_mds_ost_orig_logops;
3842 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3843 struct obd_device *tgt, int count,
3844 struct llog_catid *catid, struct obd_uuid *uuid)
3849 LASSERT(olg == &obd->obd_olg);
3850 spin_lock(&obd->obd_dev_lock);
3851 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3852 osc_mds_ost_orig_logops = llog_lvfs_ops;
3853 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3854 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3855 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3856 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3858 spin_unlock(&obd->obd_dev_lock);
3860 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3861 &catid->lci_logid, &osc_mds_ost_orig_logops);
3863 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3867 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3868 NULL, &osc_size_repl_logops);
3870 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3873 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3874 obd->obd_name, tgt->obd_name, count, catid, rc);
3875 CERROR("logid "LPX64":0x%x\n",
3876 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3881 static int osc_llog_finish(struct obd_device *obd, int count)
3883 struct llog_ctxt *ctxt;
3884 int rc = 0, rc2 = 0;
3887 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3889 rc = llog_cleanup(ctxt);
3891 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3893 rc2 = llog_cleanup(ctxt);
3900 static int osc_reconnect(const struct lu_env *env,
3901 struct obd_export *exp, struct obd_device *obd,
3902 struct obd_uuid *cluuid,
3903 struct obd_connect_data *data)
3905 struct client_obd *cli = &obd->u.cli;
3907 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3910 client_obd_list_lock(&cli->cl_loi_list_lock);
3911 data->ocd_grant = cli->cl_avail_grant ?:
3912 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3913 lost_grant = cli->cl_lost_grant;
3914 cli->cl_lost_grant = 0;
3915 client_obd_list_unlock(&cli->cl_loi_list_lock);
3917 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3918 "cl_lost_grant: %ld\n", data->ocd_grant,
3919 cli->cl_avail_grant, lost_grant);
3920 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3921 " ocd_grant: %d\n", data->ocd_connect_flags,
3922 data->ocd_version, data->ocd_grant);
3928 static int osc_disconnect(struct obd_export *exp)
3930 struct obd_device *obd = class_exp2obd(exp);
3931 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3934 if (obd->u.cli.cl_conn_count == 1)
3935 /* flush any remaining cancel messages out to the target */
3936 llog_sync(ctxt, exp);
3938 llog_ctxt_put(ctxt);
3940 rc = client_disconnect_export(exp);
3944 static int osc_import_event(struct obd_device *obd,
3945 struct obd_import *imp,
3946 enum obd_import_event event)
3948 struct client_obd *cli;
3952 LASSERT(imp->imp_obd == obd);
3955 case IMP_EVENT_DISCON: {
3956 /* Only do this on the MDS OSC's */
3957 if (imp->imp_server_timeout) {
3958 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3960 spin_lock(&oscc->oscc_lock);
3961 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3962 spin_unlock(&oscc->oscc_lock);
3965 client_obd_list_lock(&cli->cl_loi_list_lock);
3966 cli->cl_avail_grant = 0;
3967 cli->cl_lost_grant = 0;
3968 client_obd_list_unlock(&cli->cl_loi_list_lock);
3971 case IMP_EVENT_INACTIVE: {
3972 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3975 case IMP_EVENT_INVALIDATE: {
3976 struct ldlm_namespace *ns = obd->obd_namespace;
3980 client_obd_list_lock(&cli->cl_loi_list_lock);
3981 /* all pages go to failing rpcs due to the invalid import */
3982 osc_check_rpcs(cli);
3983 client_obd_list_unlock(&cli->cl_loi_list_lock);
3985 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3989 case IMP_EVENT_ACTIVE: {
3990 /* Only do this on the MDS OSC's */
3991 if (imp->imp_server_timeout) {
3992 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3994 spin_lock(&oscc->oscc_lock);
3995 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3996 spin_unlock(&oscc->oscc_lock);
3998 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4001 case IMP_EVENT_OCD: {
4002 struct obd_connect_data *ocd = &imp->imp_connect_data;
4004 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4005 osc_init_grant(&obd->u.cli, ocd);
4008 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4009 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4011 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4015 CERROR("Unknown import event %d\n", event);
4021 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4027 rc = ptlrpcd_addref();
4031 rc = client_obd_setup(obd, lcfg);
4035 struct lprocfs_static_vars lvars = { 0 };
4036 struct client_obd *cli = &obd->u.cli;
4038 lprocfs_osc_init_vars(&lvars);
4039 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4040 lproc_osc_attach_seqstat(obd);
4041 sptlrpc_lprocfs_cliobd_attach(obd);
4042 ptlrpc_lprocfs_register_obd(obd);
4046 /* We need to allocate a few requests more, because
4047 brw_interpret tries to create new requests before freeing
4048 previous ones. Ideally we want to have 2x max_rpcs_in_flight
4049 reserved, but I afraid that might be too much wasted RAM
4050 in fact, so 2 is just my guess and still should work. */
4051 cli->cl_import->imp_rq_pool =
4052 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4054 ptlrpc_add_rqs_to_pool);
4055 cli->cl_cache = cache_create(obd);
4056 if (!cli->cl_cache) {
4065 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4071 case OBD_CLEANUP_EARLY: {
4072 struct obd_import *imp;
4073 imp = obd->u.cli.cl_import;
4074 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4075 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4076 ptlrpc_deactivate_import(imp);
4077 spin_lock(&imp->imp_lock);
4078 imp->imp_pingable = 0;
4079 spin_unlock(&imp->imp_lock);
4082 case OBD_CLEANUP_EXPORTS: {
4083 /* If we set up but never connected, the
4084 client import will not have been cleaned. */
4085 if (obd->u.cli.cl_import) {
4086 struct obd_import *imp;
4087 imp = obd->u.cli.cl_import;
4088 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4090 ptlrpc_invalidate_import(imp);
4091 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4092 class_destroy_import(imp);
4093 obd->u.cli.cl_import = NULL;
4095 rc = obd_llog_finish(obd, 0);
4097 CERROR("failed to cleanup llogging subsystems\n");
4104 int osc_cleanup(struct obd_device *obd)
4106 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4110 ptlrpc_lprocfs_unregister_obd(obd);
4111 lprocfs_obd_cleanup(obd);
4113 spin_lock(&oscc->oscc_lock);
4114 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4115 oscc->oscc_flags |= OSCC_FLAG_EXITING;
4116 spin_unlock(&oscc->oscc_lock);
4118 /* free memory of osc quota cache */
4119 lquota_cleanup(quota_interface, obd);
4121 cache_destroy(obd->u.cli.cl_cache);
4122 rc = client_obd_cleanup(obd);
4128 static int osc_register_page_removal_cb(struct obd_export *exp,
4129 obd_page_removal_cb_t func,
4130 obd_pin_extent_cb pin_cb)
4132 return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4136 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4137 obd_page_removal_cb_t func)
4139 return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4142 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4143 obd_lock_cancel_cb cb)
4145 LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4147 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4151 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4152 obd_lock_cancel_cb cb)
4154 if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4155 CERROR("Unregistering cancel cb %p, while only %p was "
4157 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4161 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4165 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4167 struct lustre_cfg *lcfg = buf;
4168 struct lprocfs_static_vars lvars = { 0 };
4171 lprocfs_osc_init_vars(&lvars);
4173 switch (lcfg->lcfg_command) {
4174 case LCFG_SPTLRPC_CONF:
4175 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4178 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4186 struct obd_ops osc_obd_ops = {
4187 .o_owner = THIS_MODULE,
4188 .o_setup = osc_setup,
4189 .o_precleanup = osc_precleanup,
4190 .o_cleanup = osc_cleanup,
4191 .o_add_conn = client_import_add_conn,
4192 .o_del_conn = client_import_del_conn,
4193 .o_connect = client_connect_import,
4194 .o_reconnect = osc_reconnect,
4195 .o_disconnect = osc_disconnect,
4196 .o_statfs = osc_statfs,
4197 .o_statfs_async = osc_statfs_async,
4198 .o_packmd = osc_packmd,
4199 .o_unpackmd = osc_unpackmd,
4200 .o_precreate = osc_precreate,
4201 .o_create = osc_create,
4202 .o_destroy = osc_destroy,
4203 .o_getattr = osc_getattr,
4204 .o_getattr_async = osc_getattr_async,
4205 .o_setattr = osc_setattr,
4206 .o_setattr_async = osc_setattr_async,
4208 .o_brw_async = osc_brw_async,
4209 .o_prep_async_page = osc_prep_async_page,
4210 .o_reget_short_lock = osc_reget_short_lock,
4211 .o_release_short_lock = osc_release_short_lock,
4212 .o_queue_async_io = osc_queue_async_io,
4213 .o_set_async_flags = osc_set_async_flags,
4214 .o_queue_group_io = osc_queue_group_io,
4215 .o_trigger_group_io = osc_trigger_group_io,
4216 .o_teardown_async_page = osc_teardown_async_page,
4217 .o_punch = osc_punch,
4219 .o_enqueue = osc_enqueue,
4220 .o_match = osc_match,
4221 .o_change_cbdata = osc_change_cbdata,
4222 .o_cancel = osc_cancel,
4223 .o_cancel_unused = osc_cancel_unused,
4224 .o_iocontrol = osc_iocontrol,
4225 .o_get_info = osc_get_info,
4226 .o_set_info_async = osc_set_info_async,
4227 .o_import_event = osc_import_event,
4228 .o_llog_init = osc_llog_init,
4229 .o_llog_finish = osc_llog_finish,
4230 .o_process_config = osc_process_config,
4231 .o_register_page_removal_cb = osc_register_page_removal_cb,
4232 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4233 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4234 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4236 int __init osc_init(void)
4238 struct lprocfs_static_vars lvars = { 0 };
4242 lprocfs_osc_init_vars(&lvars);
4244 request_module("lquota");
4245 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4246 lquota_init(quota_interface);
4247 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4249 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4250 LUSTRE_OSC_NAME, NULL);
4252 if (quota_interface)
4253 PORTAL_SYMBOL_PUT(osc_quota_interface);
4261 static void /*__exit*/ osc_exit(void)
4263 lquota_exit(quota_interface);
4264 if (quota_interface)
4265 PORTAL_SYMBOL_PUT(osc_quota_interface);
4267 class_unregister_type(LUSTRE_OSC_NAME);
4270 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4271 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4272 MODULE_LICENSE("GPL");
4274 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);