1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68 struct lov_stripe_md *lsm)
73 lmm_size = sizeof(**lmmp);
78 OBD_FREE(*lmmp, lmm_size);
84 OBD_ALLOC(*lmmp, lmm_size);
90 LASSERT(lsm->lsm_object_id);
91 LASSERT(lsm->lsm_object_gr);
92 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101 struct lov_mds_md *lmm, int lmm_bytes)
107 if (lmm_bytes < sizeof (*lmm)) {
108 CERROR("lov_mds_md too small: %d, need %d\n",
109 lmm_bytes, (int)sizeof(*lmm));
112 /* XXX LOV_MAGIC etc check? */
114 if (lmm->lmm_object_id == 0) {
115 CERROR("lov_mds_md: zero lmm_object_id\n");
120 lsm_size = lov_stripe_md_size(1);
124 if (*lsmp != NULL && lmm == NULL) {
125 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126 OBD_FREE(*lsmp, lsm_size);
132 OBD_ALLOC(*lsmp, lsm_size);
135 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137 OBD_FREE(*lsmp, lsm_size);
140 loi_init((*lsmp)->lsm_oinfo[0]);
144 /* XXX zero *lsmp? */
145 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147 LASSERT((*lsmp)->lsm_object_id);
148 LASSERT((*lsmp)->lsm_object_gr);
151 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156 static inline void osc_pack_capa(struct ptlrpc_request *req,
157 struct ost_body *body, void *capa)
159 struct obd_capa *oc = (struct obd_capa *)capa;
160 struct lustre_capa *c;
165 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
168 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169 DEBUG_CAPA(D_SEC, c, "pack");
172 static inline void osc_pack_req_body(struct ptlrpc_request *req,
173 struct obd_info *oinfo)
175 struct ost_body *body;
177 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
180 body->oa = *oinfo->oi_oa;
181 osc_pack_capa(req, body, oinfo->oi_capa);
184 static inline void osc_set_capa_size(struct ptlrpc_request *req,
185 const struct req_msg_field *field,
189 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
191 /* it is already calculated as sizeof struct obd_capa */
195 static int osc_getattr_interpret(struct ptlrpc_request *req,
196 struct osc_async_args *aa, int rc)
198 struct ost_body *body;
204 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
205 lustre_swab_ost_body);
207 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
208 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
210 /* This should really be sent by the OST */
211 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
212 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
214 CDEBUG(D_INFO, "can't unpack ost_body\n");
216 aa->aa_oi->oi_oa->o_valid = 0;
219 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
223 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
224 struct ptlrpc_request_set *set)
226 struct ptlrpc_request *req;
227 struct osc_async_args *aa;
231 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
235 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
236 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
238 ptlrpc_request_free(req);
242 osc_pack_req_body(req, oinfo);
244 ptlrpc_request_set_replen(req);
245 req->rq_interpret_reply = osc_getattr_interpret;
247 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
248 aa = (struct osc_async_args *)&req->rq_async_args;
251 ptlrpc_set_add_req(set, req);
255 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
257 struct ptlrpc_request *req;
258 struct ost_body *body;
262 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
266 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
267 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
269 ptlrpc_request_free(req);
273 osc_pack_req_body(req, oinfo);
275 ptlrpc_request_set_replen(req);
277 rc = ptlrpc_queue_wait(req);
281 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
283 GOTO(out, rc = -EPROTO);
285 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
286 *oinfo->oi_oa = body->oa;
288 /* This should really be sent by the OST */
289 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
290 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
294 ptlrpc_req_finished(req);
298 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
299 struct obd_trans_info *oti)
301 struct ptlrpc_request *req;
302 struct ost_body *body;
306 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
307 oinfo->oi_oa->o_gr > 0);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
313 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
316 ptlrpc_request_free(req);
320 osc_pack_req_body(req, oinfo);
322 ptlrpc_request_set_replen(req);
325 rc = ptlrpc_queue_wait(req);
329 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
331 GOTO(out, rc = -EPROTO);
333 *oinfo->oi_oa = body->oa;
337 ptlrpc_req_finished(req);
341 static int osc_setattr_interpret(struct ptlrpc_request *req,
342 struct osc_async_args *aa, int rc)
344 struct ost_body *body;
350 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
352 GOTO(out, rc = -EPROTO);
354 *aa->aa_oi->oi_oa = body->oa;
356 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
360 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
361 struct obd_trans_info *oti,
362 struct ptlrpc_request_set *rqset)
364 struct ptlrpc_request *req;
365 struct osc_async_args *aa;
369 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
373 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
374 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
376 ptlrpc_request_free(req);
380 osc_pack_req_body(req, oinfo);
382 ptlrpc_request_set_replen(req);
384 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
386 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
389 /* do mds to ost setattr asynchronouly */
391 /* Do not wait for response. */
392 ptlrpcd_add_req(req);
394 req->rq_interpret_reply = osc_setattr_interpret;
396 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
397 aa = (struct osc_async_args *)&req->rq_async_args;
400 ptlrpc_set_add_req(rqset, req);
406 int osc_real_create(struct obd_export *exp, struct obdo *oa,
407 struct lov_stripe_md **ea, struct obd_trans_info *oti)
409 struct ptlrpc_request *req;
410 struct ost_body *body;
411 struct lov_stripe_md *lsm;
420 rc = obd_alloc_memmd(exp, &lsm);
425 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
427 GOTO(out, rc = -ENOMEM);
429 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
431 ptlrpc_request_free(req);
435 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
439 ptlrpc_request_set_replen(req);
441 if (oa->o_valid & OBD_MD_FLINLINE) {
442 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
443 oa->o_flags == OBD_FL_DELORPHAN);
445 "delorphan from OST integration");
446 /* Don't resend the delorphan req */
447 req->rq_no_resend = req->rq_no_delay = 1;
450 rc = ptlrpc_queue_wait(req);
454 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
456 GOTO(out_req, rc = -EPROTO);
460 /* This should really be sent by the OST */
461 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
462 oa->o_valid |= OBD_MD_FLBLKSZ;
464 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
465 * have valid lsm_oinfo data structs, so don't go touching that.
466 * This needs to be fixed in a big way.
468 lsm->lsm_object_id = oa->o_id;
469 lsm->lsm_object_gr = oa->o_gr;
473 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
475 if (oa->o_valid & OBD_MD_FLCOOKIE) {
476 if (!oti->oti_logcookies)
477 oti_alloc_cookies(oti, 1);
478 *oti->oti_logcookies = *obdo_logcookie(oa);
482 CDEBUG(D_HA, "transno: "LPD64"\n",
483 lustre_msg_get_transno(req->rq_repmsg));
485 ptlrpc_req_finished(req);
488 obd_free_memmd(exp, &lsm);
492 static int osc_punch_interpret(struct ptlrpc_request *req,
493 struct osc_async_args *aa, int rc)
495 struct ost_body *body;
501 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
503 GOTO(out, rc = -EPROTO);
505 *aa->aa_oi->oi_oa = body->oa;
507 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
511 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
512 struct obd_trans_info *oti,
513 struct ptlrpc_request_set *rqset)
515 struct ptlrpc_request *req;
516 struct osc_async_args *aa;
517 struct ost_body *body;
522 CDEBUG(D_INFO, "oa NULL\n");
526 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
533 ptlrpc_request_free(req);
536 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537 osc_pack_req_body(req, oinfo);
539 /* overload the size and blocks fields in the oa with start/end */
540 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542 body->oa.o_size = oinfo->oi_policy.l_extent.start;
543 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
544 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
545 ptlrpc_request_set_replen(req);
548 req->rq_interpret_reply = osc_punch_interpret;
549 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
550 aa = (struct osc_async_args *)&req->rq_async_args;
552 ptlrpc_set_add_req(rqset, req);
557 static int osc_sync(struct obd_export *exp, struct obdo *oa,
558 struct lov_stripe_md *md, obd_size start, obd_size end,
561 struct ptlrpc_request *req;
562 struct ost_body *body;
567 CDEBUG(D_INFO, "oa NULL\n");
571 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
575 osc_set_capa_size(req, &RMF_CAPA1, capa);
576 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
578 ptlrpc_request_free(req);
582 /* overload the size and blocks fields in the oa with start/end */
583 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
586 body->oa.o_size = start;
587 body->oa.o_blocks = end;
588 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
589 osc_pack_capa(req, body, capa);
591 ptlrpc_request_set_replen(req);
593 rc = ptlrpc_queue_wait(req);
597 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
599 GOTO(out, rc = -EPROTO);
605 ptlrpc_req_finished(req);
609 /* Find and cancel locally locks matched by @mode in the resource found by
610 * @objid. Found locks are added into @cancel list. Returns the amount of
611 * locks added to @cancels list. */
612 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
613 struct list_head *cancels, ldlm_mode_t mode,
616 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
617 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
618 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
625 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
626 lock_flags, 0, NULL);
627 ldlm_resource_putref(res);
631 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
634 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
636 atomic_dec(&cli->cl_destroy_in_flight);
637 cfs_waitq_signal(&cli->cl_destroy_waitq);
641 static int osc_can_send_destroy(struct client_obd *cli)
643 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
644 cli->cl_max_rpcs_in_flight) {
645 /* The destroy request can be sent */
648 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
649 cli->cl_max_rpcs_in_flight) {
651 * The counter has been modified between the two atomic
654 cfs_waitq_signal(&cli->cl_destroy_waitq);
659 /* Destroy requests can be async always on the client, and we don't even really
660 * care about the return code since the client cannot do anything at all about
662 * When the MDS is unlinking a filename, it saves the file objects into a
663 * recovery llog, and these object records are cancelled when the OST reports
664 * they were destroyed and sync'd to disk (i.e. transaction committed).
665 * If the client dies, or the OST is down when the object should be destroyed,
666 * the records are not cancelled, and when the OST reconnects to the MDS next,
667 * it will retrieve the llog unlink logs and then sends the log cancellation
668 * cookies to the MDS after committing destroy transactions. */
669 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
670 struct lov_stripe_md *ea, struct obd_trans_info *oti,
671 struct obd_export *md_export)
673 struct client_obd *cli = &exp->exp_obd->u.cli;
674 struct ptlrpc_request *req;
675 struct ost_body *body;
676 CFS_LIST_HEAD(cancels);
681 CDEBUG(D_INFO, "oa NULL\n");
685 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
686 LDLM_FL_DISCARD_DATA);
688 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
690 ldlm_lock_list_put(&cancels, l_bl_ast, count);
694 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
697 ptlrpc_request_free(req);
701 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
702 req->rq_interpret_reply = osc_destroy_interpret;
704 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
705 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
706 sizeof(*oti->oti_logcookies));
707 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
711 ptlrpc_request_set_replen(req);
713 if (!osc_can_send_destroy(cli)) {
714 struct l_wait_info lwi = { 0 };
717 * Wait until the number of on-going destroy RPCs drops
718 * under max_rpc_in_flight
720 l_wait_event_exclusive(cli->cl_destroy_waitq,
721 osc_can_send_destroy(cli), &lwi);
724 /* Do not wait for response */
725 ptlrpcd_add_req(req);
729 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
732 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
734 LASSERT(!(oa->o_valid & bits));
737 client_obd_list_lock(&cli->cl_loi_list_lock);
738 oa->o_dirty = cli->cl_dirty;
739 if (cli->cl_dirty > cli->cl_dirty_max) {
740 CERROR("dirty %lu > dirty_max %lu\n",
741 cli->cl_dirty, cli->cl_dirty_max);
743 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
744 CERROR("dirty %d > system dirty_max %d\n",
745 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
747 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
748 CERROR("dirty %lu - dirty_max %lu too big???\n",
749 cli->cl_dirty, cli->cl_dirty_max);
752 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
753 (cli->cl_max_rpcs_in_flight + 1);
754 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
756 oa->o_grant = cli->cl_avail_grant;
757 oa->o_dropped = cli->cl_lost_grant;
758 cli->cl_lost_grant = 0;
759 client_obd_list_unlock(&cli->cl_loi_list_lock);
760 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
761 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
764 /* caller must hold loi_list_lock */
765 static void osc_consume_write_grant(struct client_obd *cli,
766 struct brw_page *pga)
768 atomic_inc(&obd_dirty_pages);
769 cli->cl_dirty += CFS_PAGE_SIZE;
770 cli->cl_avail_grant -= CFS_PAGE_SIZE;
771 pga->flag |= OBD_BRW_FROM_GRANT;
772 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
773 CFS_PAGE_SIZE, pga, pga->pg);
774 LASSERT(cli->cl_avail_grant >= 0);
777 /* the companion to osc_consume_write_grant, called when a brw has completed.
778 * must be called with the loi lock held. */
779 static void osc_release_write_grant(struct client_obd *cli,
780 struct brw_page *pga, int sent)
782 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
785 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
790 pga->flag &= ~OBD_BRW_FROM_GRANT;
791 atomic_dec(&obd_dirty_pages);
792 cli->cl_dirty -= CFS_PAGE_SIZE;
794 cli->cl_lost_grant += CFS_PAGE_SIZE;
795 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
796 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
797 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
798 /* For short writes we shouldn't count parts of pages that
799 * span a whole block on the OST side, or our accounting goes
800 * wrong. Should match the code in filter_grant_check. */
801 int offset = pga->off & ~CFS_PAGE_MASK;
802 int count = pga->count + (offset & (blocksize - 1));
803 int end = (offset + pga->count) & (blocksize - 1);
805 count += blocksize - end;
807 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
808 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
809 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
810 cli->cl_avail_grant, cli->cl_dirty);
816 static unsigned long rpcs_in_flight(struct client_obd *cli)
818 return cli->cl_r_in_flight + cli->cl_w_in_flight;
821 /* caller must hold loi_list_lock */
822 void osc_wake_cache_waiters(struct client_obd *cli)
824 struct list_head *l, *tmp;
825 struct osc_cache_waiter *ocw;
828 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
829 /* if we can't dirty more, we must wait until some is written */
830 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
831 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
832 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
833 "osc max %ld, sys max %d\n", cli->cl_dirty,
834 cli->cl_dirty_max, obd_max_dirty_pages);
838 /* if still dirty cache but no grant wait for pending RPCs that
839 * may yet return us some grant before doing sync writes */
840 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
841 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
842 cli->cl_w_in_flight);
846 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
847 list_del_init(&ocw->ocw_entry);
848 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
849 /* no more RPCs in flight to return grant, do sync IO */
850 ocw->ocw_rc = -EDQUOT;
851 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
853 osc_consume_write_grant(cli,
854 &ocw->ocw_oap->oap_brw_page);
857 cfs_waitq_signal(&ocw->ocw_waitq);
863 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
865 client_obd_list_lock(&cli->cl_loi_list_lock);
866 cli->cl_avail_grant = ocd->ocd_grant;
867 client_obd_list_unlock(&cli->cl_loi_list_lock);
869 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
870 cli->cl_avail_grant, cli->cl_lost_grant);
871 LASSERT(cli->cl_avail_grant >= 0);
874 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
876 client_obd_list_lock(&cli->cl_loi_list_lock);
877 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
878 if (body->oa.o_valid & OBD_MD_FLGRANT)
879 cli->cl_avail_grant += body->oa.o_grant;
880 /* waiters are woken in brw_interpret_oap */
881 client_obd_list_unlock(&cli->cl_loi_list_lock);
884 /* We assume that the reason this OSC got a short read is because it read
885 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
886 * via the LOV, and it _knows_ it's reading inside the file, it's just that
887 * this stripe never got written at or beyond this stripe offset yet. */
888 static void handle_short_read(int nob_read, obd_count page_count,
889 struct brw_page **pga)
894 /* skip bytes read OK */
895 while (nob_read > 0) {
896 LASSERT (page_count > 0);
898 if (pga[i]->count > nob_read) {
899 /* EOF inside this page */
900 ptr = cfs_kmap(pga[i]->pg) +
901 (pga[i]->off & ~CFS_PAGE_MASK);
902 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
903 cfs_kunmap(pga[i]->pg);
909 nob_read -= pga[i]->count;
914 /* zero remaining pages */
915 while (page_count-- > 0) {
916 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
917 memset(ptr, 0, pga[i]->count);
918 cfs_kunmap(pga[i]->pg);
923 static int check_write_rcs(struct ptlrpc_request *req,
924 int requested_nob, int niocount,
925 obd_count page_count, struct brw_page **pga)
929 /* return error if any niobuf was in error */
930 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
931 sizeof(*remote_rcs) * niocount, NULL);
932 if (remote_rcs == NULL) {
933 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
936 if (lustre_msg_swabbed(req->rq_repmsg))
937 for (i = 0; i < niocount; i++)
938 __swab32s(&remote_rcs[i]);
940 for (i = 0; i < niocount; i++) {
941 if (remote_rcs[i] < 0)
942 return(remote_rcs[i]);
944 if (remote_rcs[i] != 0) {
945 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
946 i, remote_rcs[i], req);
951 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
952 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
953 requested_nob, req->rq_bulk->bd_nob_transferred);
960 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
962 if (p1->flag != p2->flag) {
963 unsigned mask = ~OBD_BRW_FROM_GRANT;
965 /* warn if we try to combine flags that we don't know to be
967 if ((p1->flag & mask) != (p2->flag & mask))
968 CERROR("is it ok to have flags 0x%x and 0x%x in the "
969 "same brw?\n", p1->flag, p2->flag);
973 return (p1->off + p1->count == p2->off);
976 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
977 struct brw_page **pga, int opc)
982 LASSERT (pg_count > 0);
983 while (nob > 0 && pg_count > 0) {
984 unsigned char *ptr = cfs_kmap(pga[i]->pg);
985 int off = pga[i]->off & ~CFS_PAGE_MASK;
986 int count = pga[i]->count > nob ? nob : pga[i]->count;
988 /* corrupt the data before we compute the checksum, to
989 * simulate an OST->client data error */
990 if (i == 0 && opc == OST_READ &&
991 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
992 memcpy(ptr + off, "bad1", min(4, nob));
993 cksum = crc32_le(cksum, ptr + off, count);
994 cfs_kunmap(pga[i]->pg);
995 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
998 nob -= pga[i]->count;
1002 /* For sending we only compute the wrong checksum instead
1003 * of corrupting the data so it is still correct on a redo */
1004 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1010 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1011 struct lov_stripe_md *lsm, obd_count page_count,
1012 struct brw_page **pga,
1013 struct ptlrpc_request **reqp,
1014 struct obd_capa *ocapa)
1016 struct ptlrpc_request *req;
1017 struct ptlrpc_bulk_desc *desc;
1018 struct ost_body *body;
1019 struct obd_ioobj *ioobj;
1020 struct niobuf_remote *niobuf;
1021 int niocount, i, requested_nob, opc, rc;
1022 struct osc_brw_async_args *aa;
1023 struct req_capsule *pill;
1026 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1027 RETURN(-ENOMEM); /* Recoverable */
1028 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1029 RETURN(-EINVAL); /* Fatal */
1031 if ((cmd & OBD_BRW_WRITE) != 0) {
1033 req = ptlrpc_request_alloc_pool(cli->cl_import,
1034 cli->cl_import->imp_rq_pool,
1038 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1044 for (niocount = i = 1; i < page_count; i++) {
1045 if (!can_merge_pages(pga[i - 1], pga[i]))
1049 pill = &req->rq_pill;
1050 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1051 niocount * sizeof(*niobuf));
1052 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1054 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1056 ptlrpc_request_free(req);
1059 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1061 if (opc == OST_WRITE)
1062 desc = ptlrpc_prep_bulk_imp(req, page_count,
1063 BULK_GET_SOURCE, OST_BULK_PORTAL);
1065 desc = ptlrpc_prep_bulk_imp(req, page_count,
1066 BULK_PUT_SINK, OST_BULK_PORTAL);
1069 GOTO(out, rc = -ENOMEM);
1070 /* NB request now owns desc and will free it when it gets freed */
1072 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1073 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1074 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1075 LASSERT(body && ioobj && niobuf);
1079 obdo_to_ioobj(oa, ioobj);
1080 ioobj->ioo_bufcnt = niocount;
1081 osc_pack_capa(req, body, ocapa);
1082 LASSERT (page_count > 0);
1083 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1084 struct brw_page *pg = pga[i];
1085 struct brw_page *pg_prev = pga[i - 1];
1087 LASSERT(pg->count > 0);
1088 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1089 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1090 pg->off, pg->count);
1092 LASSERTF(i == 0 || pg->off > pg_prev->off,
1093 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1094 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1096 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1097 pg_prev->pg, page_private(pg_prev->pg),
1098 pg_prev->pg->index, pg_prev->off);
1100 LASSERTF(i == 0 || pg->off > pg_prev->off,
1101 "i %d p_c %u\n", i, page_count);
1103 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1104 (pg->flag & OBD_BRW_SRVLOCK));
1106 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1108 requested_nob += pg->count;
1110 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1112 niobuf->len += pg->count;
1114 niobuf->offset = pg->off;
1115 niobuf->len = pg->count;
1116 niobuf->flags = pg->flag;
1120 LASSERT((void *)(niobuf - niocount) ==
1121 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1122 niocount * sizeof(*niobuf)));
1123 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1125 /* size[REQ_REC_OFF] still sizeof (*body) */
1126 if (opc == OST_WRITE) {
1127 if (unlikely(cli->cl_checksum) &&
1128 req->rq_flvr.sf_bulk_csum == BULK_CSUM_ALG_NULL) {
1129 body->oa.o_valid |= OBD_MD_FLCKSUM;
1130 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1133 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1135 /* save this in 'oa', too, for later checking */
1136 oa->o_valid |= OBD_MD_FLCKSUM;
1138 /* clear out the checksum flag, in case this is a
1139 * resend but cl_checksum is no longer set. b=11238 */
1140 oa->o_valid &= ~OBD_MD_FLCKSUM;
1142 oa->o_cksum = body->oa.o_cksum;
1143 /* 1 RC per niobuf */
1144 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1145 sizeof(__u32) * niocount);
1147 if (unlikely(cli->cl_checksum) &&
1148 req->rq_flvr.sf_bulk_csum == BULK_CSUM_ALG_NULL)
1149 body->oa.o_valid |= OBD_MD_FLCKSUM;
1150 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1151 /* 1 RC for the whole I/O */
1153 ptlrpc_request_set_replen(req);
1155 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1156 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1158 aa->aa_requested_nob = requested_nob;
1159 aa->aa_nio_count = niocount;
1160 aa->aa_page_count = page_count;
1164 INIT_LIST_HEAD(&aa->aa_oaps);
1170 ptlrpc_req_finished(req);
1174 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1175 __u32 client_cksum, __u32 server_cksum,
1176 int nob, obd_count page_count,
1177 struct brw_page **pga)
1182 if (server_cksum == client_cksum) {
1183 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1187 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1189 if (new_cksum == server_cksum)
1190 msg = "changed on the client after we checksummed it - "
1191 "likely false positive due to mmap IO (bug 11742)";
1192 else if (new_cksum == client_cksum)
1193 msg = "changed in transit before arrival at OST";
1195 msg = "changed in transit AND doesn't match the original - "
1196 "likely false positive due to mmap IO (bug 11742)";
1198 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1199 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1200 "["LPU64"-"LPU64"]\n",
1201 msg, libcfs_nid2str(peer->nid),
1202 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1203 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1206 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1208 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1209 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1210 client_cksum, server_cksum, new_cksum);
1214 /* Note rc enters this function as number of bytes transferred */
1215 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1217 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1218 const lnet_process_id_t *peer =
1219 &req->rq_import->imp_connection->c_peer;
1220 struct client_obd *cli = aa->aa_cli;
1221 struct ost_body *body;
1222 __u32 client_cksum = 0;
1225 if (rc < 0 && rc != -EDQUOT)
1228 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1229 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1230 lustre_swab_ost_body);
1232 CDEBUG(D_INFO, "Can't unpack body\n");
1236 /* set/clear over quota flag for a uid/gid */
1237 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1238 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1239 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1240 body->oa.o_gid, body->oa.o_valid,
1246 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1247 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1249 osc_update_grant(cli, body);
1251 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1253 CERROR("Unexpected +ve rc %d\n", rc);
1256 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1258 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1260 check_write_checksum(&body->oa, peer, client_cksum,
1262 aa->aa_requested_nob,
1267 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1270 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1271 aa->aa_page_count, aa->aa_ppga);
1275 /* The rest of this function executes only for OST_READs */
1276 if (rc > aa->aa_requested_nob) {
1277 CERROR("Unexpected rc %d (%d requested)\n", rc,
1278 aa->aa_requested_nob);
1282 if (rc != req->rq_bulk->bd_nob_transferred) {
1283 CERROR ("Unexpected rc %d (%d transferred)\n",
1284 rc, req->rq_bulk->bd_nob_transferred);
1288 if (rc < aa->aa_requested_nob)
1289 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1291 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1293 GOTO(out, rc = -EAGAIN);
1295 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1296 static int cksum_counter;
1297 __u32 server_cksum = body->oa.o_cksum;
1301 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1302 aa->aa_ppga, OST_READ);
1304 if (peer->nid == req->rq_bulk->bd_sender) {
1308 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1311 if (server_cksum == ~0 && rc > 0) {
1312 CERROR("Protocol error: server %s set the 'checksum' "
1313 "bit, but didn't send a checksum. Not fatal, "
1314 "but please tell CFS.\n",
1315 libcfs_nid2str(peer->nid));
1316 } else if (server_cksum != client_cksum) {
1317 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1318 "%s%s%s inum "LPU64"/"LPU64" object "
1319 LPU64"/"LPU64" extent "
1320 "["LPU64"-"LPU64"]\n",
1321 req->rq_import->imp_obd->obd_name,
1322 libcfs_nid2str(peer->nid),
1324 body->oa.o_valid & OBD_MD_FLFID ?
1325 body->oa.o_fid : (__u64)0,
1326 body->oa.o_valid & OBD_MD_FLFID ?
1327 body->oa.o_generation :(__u64)0,
1329 body->oa.o_valid & OBD_MD_FLGROUP ?
1330 body->oa.o_gr : (__u64)0,
1331 aa->aa_ppga[0]->off,
1332 aa->aa_ppga[aa->aa_page_count-1]->off +
1333 aa->aa_ppga[aa->aa_page_count-1]->count -
1335 CERROR("client %x, server %x\n",
1336 client_cksum, server_cksum);
1338 aa->aa_oa->o_cksum = client_cksum;
1342 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1345 } else if (unlikely(client_cksum)) {
1346 static int cksum_missed;
1349 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1350 CERROR("Checksum %u requested from %s but not sent\n",
1351 cksum_missed, libcfs_nid2str(peer->nid));
1357 *aa->aa_oa = body->oa;
1362 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1363 struct lov_stripe_md *lsm,
1364 obd_count page_count, struct brw_page **pga,
1365 struct obd_capa *ocapa)
1367 struct ptlrpc_request *req;
1371 struct l_wait_info lwi;
1375 cfs_waitq_init(&waitq);
1378 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1379 page_count, pga, &req, ocapa);
1383 rc = ptlrpc_queue_wait(req);
1385 if (rc == -ETIMEDOUT && req->rq_resend) {
1386 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1387 ptlrpc_req_finished(req);
1391 rc = osc_brw_fini_request(req, rc);
1393 ptlrpc_req_finished(req);
1394 if (osc_recoverable_error(rc)) {
1396 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1397 CERROR("too many resend retries, returning error\n");
1401 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1402 l_wait_event(waitq, 0, &lwi);
1410 int osc_brw_redo_request(struct ptlrpc_request *request,
1411 struct osc_brw_async_args *aa)
1413 struct ptlrpc_request *new_req;
1414 struct ptlrpc_request_set *set = request->rq_set;
1415 struct osc_brw_async_args *new_aa;
1416 struct osc_async_page *oap;
1420 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1421 CERROR("too many resend retries, returning error\n");
1425 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1427 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1428 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1429 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1432 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1433 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1434 aa->aa_cli, aa->aa_oa,
1435 NULL /* lsm unused by osc currently */,
1436 aa->aa_page_count, aa->aa_ppga,
1437 &new_req, NULL /* ocapa */);
1441 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1443 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1444 if (oap->oap_request != NULL) {
1445 LASSERTF(request == oap->oap_request,
1446 "request %p != oap_request %p\n",
1447 request, oap->oap_request);
1448 if (oap->oap_interrupted) {
1449 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1450 ptlrpc_req_finished(new_req);
1455 /* New request takes over pga and oaps from old request.
1456 * Note that copying a list_head doesn't work, need to move it... */
1458 new_req->rq_interpret_reply = request->rq_interpret_reply;
1459 new_req->rq_async_args = request->rq_async_args;
1460 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1462 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1464 INIT_LIST_HEAD(&new_aa->aa_oaps);
1465 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1466 INIT_LIST_HEAD(&aa->aa_oaps);
1468 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1469 if (oap->oap_request) {
1470 ptlrpc_req_finished(oap->oap_request);
1471 oap->oap_request = ptlrpc_request_addref(new_req);
1475 /* use ptlrpc_set_add_req is safe because interpret functions work
1476 * in check_set context. only one way exist with access to request
1477 * from different thread got -EINTR - this way protected with
1478 * cl_loi_list_lock */
1479 ptlrpc_set_add_req(set, new_req);
1481 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1483 DEBUG_REQ(D_INFO, new_req, "new request");
1487 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1489 struct osc_brw_async_args *aa = data;
1493 rc = osc_brw_fini_request(req, rc);
1494 if (osc_recoverable_error(rc)) {
1495 rc = osc_brw_redo_request(req, aa);
1500 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1501 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1502 aa->aa_cli->cl_w_in_flight--;
1504 aa->aa_cli->cl_r_in_flight--;
1505 for (i = 0; i < aa->aa_page_count; i++)
1506 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1507 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1509 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1514 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1515 struct lov_stripe_md *lsm, obd_count page_count,
1516 struct brw_page **pga, struct ptlrpc_request_set *set,
1517 struct obd_capa *ocapa)
1519 struct ptlrpc_request *req;
1520 struct client_obd *cli = &exp->exp_obd->u.cli;
1522 struct osc_brw_async_args *aa;
1525 /* Consume write credits even if doing a sync write -
1526 * otherwise we may run out of space on OST due to grant. */
1527 if (cmd == OBD_BRW_WRITE) {
1528 spin_lock(&cli->cl_loi_list_lock);
1529 for (i = 0; i < page_count; i++) {
1530 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1531 osc_consume_write_grant(cli, pga[i]);
1533 spin_unlock(&cli->cl_loi_list_lock);
1536 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1539 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1540 if (cmd == OBD_BRW_READ) {
1541 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1542 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1543 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1545 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1546 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1547 cli->cl_w_in_flight);
1548 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1552 req->rq_interpret_reply = brw_interpret;
1553 ptlrpc_set_add_req(set, req);
1554 client_obd_list_lock(&cli->cl_loi_list_lock);
1555 if (cmd == OBD_BRW_READ)
1556 cli->cl_r_in_flight++;
1558 cli->cl_w_in_flight++;
1559 client_obd_list_unlock(&cli->cl_loi_list_lock);
1560 } else if (cmd == OBD_BRW_WRITE) {
1561 client_obd_list_lock(&cli->cl_loi_list_lock);
1562 for (i = 0; i < page_count; i++)
1563 osc_release_write_grant(cli, pga[i], 0);
1564 client_obd_list_unlock(&cli->cl_loi_list_lock);
1570 * ugh, we want disk allocation on the target to happen in offset order. we'll
1571 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1572 * fine for our small page arrays and doesn't require allocation. its an
1573 * insertion sort that swaps elements that are strides apart, shrinking the
1574 * stride down until its '1' and the array is sorted.
1576 static void sort_brw_pages(struct brw_page **array, int num)
1579 struct brw_page *tmp;
1583 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1588 for (i = stride ; i < num ; i++) {
1591 while (j >= stride && array[j - stride]->off > tmp->off) {
1592 array[j] = array[j - stride];
1597 } while (stride > 1);
1600 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1606 LASSERT (pages > 0);
1607 offset = pg[i]->off & ~CFS_PAGE_MASK;
1611 if (pages == 0) /* that's all */
1614 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1615 return count; /* doesn't end on page boundary */
1618 offset = pg[i]->off & ~CFS_PAGE_MASK;
1619 if (offset != 0) /* doesn't start on page boundary */
1626 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1628 struct brw_page **ppga;
1631 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1635 for (i = 0; i < count; i++)
1640 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1642 LASSERT(ppga != NULL);
1643 OBD_FREE(ppga, sizeof(*ppga) * count);
1646 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1647 obd_count page_count, struct brw_page *pga,
1648 struct obd_trans_info *oti)
1650 struct obdo *saved_oa = NULL;
1651 struct brw_page **ppga, **orig;
1652 struct obd_import *imp = class_exp2cliimp(exp);
1653 struct client_obd *cli = &imp->imp_obd->u.cli;
1654 int rc, page_count_orig;
1657 if (cmd & OBD_BRW_CHECK) {
1658 /* The caller just wants to know if there's a chance that this
1659 * I/O can succeed */
1661 if (imp == NULL || imp->imp_invalid)
1666 /* test_brw with a failed create can trip this, maybe others. */
1667 LASSERT(cli->cl_max_pages_per_rpc);
1671 orig = ppga = osc_build_ppga(pga, page_count);
1674 page_count_orig = page_count;
1676 sort_brw_pages(ppga, page_count);
1677 while (page_count) {
1678 obd_count pages_per_brw;
1680 if (page_count > cli->cl_max_pages_per_rpc)
1681 pages_per_brw = cli->cl_max_pages_per_rpc;
1683 pages_per_brw = page_count;
1685 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1687 if (saved_oa != NULL) {
1688 /* restore previously saved oa */
1689 *oinfo->oi_oa = *saved_oa;
1690 } else if (page_count > pages_per_brw) {
1691 /* save a copy of oa (brw will clobber it) */
1692 OBDO_ALLOC(saved_oa);
1693 if (saved_oa == NULL)
1694 GOTO(out, rc = -ENOMEM);
1695 *saved_oa = *oinfo->oi_oa;
1698 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1699 pages_per_brw, ppga, oinfo->oi_capa);
1704 page_count -= pages_per_brw;
1705 ppga += pages_per_brw;
1709 osc_release_ppga(orig, page_count_orig);
1711 if (saved_oa != NULL)
1712 OBDO_FREE(saved_oa);
1717 static int osc_brw_async(int cmd, struct obd_export *exp,
1718 struct obd_info *oinfo, obd_count page_count,
1719 struct brw_page *pga, struct obd_trans_info *oti,
1720 struct ptlrpc_request_set *set)
1722 struct brw_page **ppga, **orig;
1723 struct client_obd *cli = &exp->exp_obd->u.cli;
1724 int page_count_orig;
1728 if (cmd & OBD_BRW_CHECK) {
1729 struct obd_import *imp = class_exp2cliimp(exp);
1730 /* The caller just wants to know if there's a chance that this
1731 * I/O can succeed */
1733 if (imp == NULL || imp->imp_invalid)
1738 orig = ppga = osc_build_ppga(pga, page_count);
1741 page_count_orig = page_count;
1743 sort_brw_pages(ppga, page_count);
1744 while (page_count) {
1745 struct brw_page **copy;
1746 obd_count pages_per_brw;
1748 pages_per_brw = min_t(obd_count, page_count,
1749 cli->cl_max_pages_per_rpc);
1751 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1753 /* use ppga only if single RPC is going to fly */
1754 if (pages_per_brw != page_count_orig || ppga != orig) {
1755 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1757 GOTO(out, rc = -ENOMEM);
1758 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1762 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1763 pages_per_brw, copy, set, oinfo->oi_capa);
1767 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1771 /* we passed it to async_internal() which is
1772 * now responsible for releasing memory */
1776 page_count -= pages_per_brw;
1777 ppga += pages_per_brw;
1781 osc_release_ppga(orig, page_count_orig);
1785 static void osc_check_rpcs(struct client_obd *cli);
1787 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1788 * the dirty accounting. Writeback completes or truncate happens before
1789 * writing starts. Must be called with the loi lock held. */
1790 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1793 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1797 /* This maintains the lists of pending pages to read/write for a given object
1798 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1799 * to quickly find objects that are ready to send an RPC. */
1800 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1806 if (lop->lop_num_pending == 0)
1809 /* if we have an invalid import we want to drain the queued pages
1810 * by forcing them through rpcs that immediately fail and complete
1811 * the pages. recovery relies on this to empty the queued pages
1812 * before canceling the locks and evicting down the llite pages */
1813 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1816 /* stream rpcs in queue order as long as as there is an urgent page
1817 * queued. this is our cheap solution for good batching in the case
1818 * where writepage marks some random page in the middle of the file
1819 * as urgent because of, say, memory pressure */
1820 if (!list_empty(&lop->lop_urgent)) {
1821 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1824 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1825 optimal = cli->cl_max_pages_per_rpc;
1826 if (cmd & OBD_BRW_WRITE) {
1827 /* trigger a write rpc stream as long as there are dirtiers
1828 * waiting for space. as they're waiting, they're not going to
1829 * create more pages to coallesce with what's waiting.. */
1830 if (!list_empty(&cli->cl_cache_waiters)) {
1831 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1834 /* +16 to avoid triggering rpcs that would want to include pages
1835 * that are being queued but which can't be made ready until
1836 * the queuer finishes with the page. this is a wart for
1837 * llite::commit_write() */
1840 if (lop->lop_num_pending >= optimal)
1846 static void on_list(struct list_head *item, struct list_head *list,
1849 if (list_empty(item) && should_be_on)
1850 list_add_tail(item, list);
1851 else if (!list_empty(item) && !should_be_on)
1852 list_del_init(item);
1855 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1856 * can find pages to build into rpcs quickly */
1857 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1859 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1860 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1861 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1863 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1864 loi->loi_write_lop.lop_num_pending);
1866 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1867 loi->loi_read_lop.lop_num_pending);
1870 static void lop_update_pending(struct client_obd *cli,
1871 struct loi_oap_pages *lop, int cmd, int delta)
1873 lop->lop_num_pending += delta;
1874 if (cmd & OBD_BRW_WRITE)
1875 cli->cl_pending_w_pages += delta;
1877 cli->cl_pending_r_pages += delta;
1880 /* this is called when a sync waiter receives an interruption. Its job is to
1881 * get the caller woken as soon as possible. If its page hasn't been put in an
1882 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1883 * desiring interruption which will forcefully complete the rpc once the rpc
1885 static void osc_occ_interrupted(struct oig_callback_context *occ)
1887 struct osc_async_page *oap;
1888 struct loi_oap_pages *lop;
1889 struct lov_oinfo *loi;
1892 /* XXX member_of() */
1893 oap = list_entry(occ, struct osc_async_page, oap_occ);
1895 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1897 oap->oap_interrupted = 1;
1899 /* ok, it's been put in an rpc. only one oap gets a request reference */
1900 if (oap->oap_request != NULL) {
1901 ptlrpc_mark_interrupted(oap->oap_request);
1902 ptlrpcd_wake(oap->oap_request);
1906 /* we don't get interruption callbacks until osc_trigger_group_io()
1907 * has been called and put the sync oaps in the pending/urgent lists.*/
1908 if (!list_empty(&oap->oap_pending_item)) {
1909 list_del_init(&oap->oap_pending_item);
1910 list_del_init(&oap->oap_urgent_item);
1913 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1914 &loi->loi_write_lop : &loi->loi_read_lop;
1915 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1916 loi_list_maint(oap->oap_cli, oap->oap_loi);
1918 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1919 oap->oap_oig = NULL;
1923 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1926 /* this is trying to propogate async writeback errors back up to the
1927 * application. As an async write fails we record the error code for later if
1928 * the app does an fsync. As long as errors persist we force future rpcs to be
1929 * sync so that the app can get a sync error and break the cycle of queueing
1930 * pages for which writeback will fail. */
1931 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1938 ar->ar_force_sync = 1;
1939 ar->ar_min_xid = ptlrpc_sample_next_xid();
1944 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1945 ar->ar_force_sync = 0;
1948 static void osc_oap_to_pending(struct osc_async_page *oap)
1950 struct loi_oap_pages *lop;
1952 if (oap->oap_cmd & OBD_BRW_WRITE)
1953 lop = &oap->oap_loi->loi_write_lop;
1955 lop = &oap->oap_loi->loi_read_lop;
1957 if (oap->oap_async_flags & ASYNC_URGENT)
1958 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1959 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1960 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1963 /* this must be called holding the loi list lock to give coverage to exit_cache,
1964 * async_flag maintenance, and oap_request */
1965 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1966 struct osc_async_page *oap, int sent, int rc)
1971 if (oap->oap_request != NULL) {
1972 xid = ptlrpc_req_xid(oap->oap_request);
1973 ptlrpc_req_finished(oap->oap_request);
1974 oap->oap_request = NULL;
1977 oap->oap_async_flags = 0;
1978 oap->oap_interrupted = 0;
1980 if (oap->oap_cmd & OBD_BRW_WRITE) {
1981 osc_process_ar(&cli->cl_ar, xid, rc);
1982 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1985 if (rc == 0 && oa != NULL) {
1986 if (oa->o_valid & OBD_MD_FLBLOCKS)
1987 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1988 if (oa->o_valid & OBD_MD_FLMTIME)
1989 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1990 if (oa->o_valid & OBD_MD_FLATIME)
1991 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1992 if (oa->o_valid & OBD_MD_FLCTIME)
1993 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1997 osc_exit_cache(cli, oap, sent);
1998 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1999 oap->oap_oig = NULL;
2004 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2005 oap->oap_cmd, oa, rc);
2007 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2008 * I/O on the page could start, but OSC calls it under lock
2009 * and thus we can add oap back to pending safely */
2011 /* upper layer wants to leave the page on pending queue */
2012 osc_oap_to_pending(oap);
2014 osc_exit_cache(cli, oap, sent);
2018 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
2020 struct osc_async_page *oap, *tmp;
2021 struct osc_brw_async_args *aa = data;
2022 struct client_obd *cli;
2025 rc = osc_brw_fini_request(req, rc);
2026 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2027 if (osc_recoverable_error(rc)) {
2028 rc = osc_brw_redo_request(req, aa);
2035 client_obd_list_lock(&cli->cl_loi_list_lock);
2037 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2038 * is called so we know whether to go to sync BRWs or wait for more
2039 * RPCs to complete */
2040 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2041 cli->cl_w_in_flight--;
2043 cli->cl_r_in_flight--;
2045 /* the caller may re-use the oap after the completion call so
2046 * we need to clean it up a little */
2047 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2048 list_del_init(&oap->oap_rpc_item);
2049 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2052 osc_wake_cache_waiters(cli);
2053 osc_check_rpcs(cli);
2055 client_obd_list_unlock(&cli->cl_loi_list_lock);
2057 OBDO_FREE(aa->aa_oa);
2059 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2063 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2064 struct list_head *rpc_list,
2065 int page_count, int cmd)
2067 struct ptlrpc_request *req;
2068 struct brw_page **pga = NULL;
2069 struct osc_brw_async_args *aa;
2070 struct obdo *oa = NULL;
2071 struct obd_async_page_ops *ops = NULL;
2072 void *caller_data = NULL;
2073 struct obd_capa *ocapa;
2074 struct osc_async_page *oap;
2078 LASSERT(!list_empty(rpc_list));
2080 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2082 RETURN(ERR_PTR(-ENOMEM));
2086 GOTO(out, req = ERR_PTR(-ENOMEM));
2089 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2091 ops = oap->oap_caller_ops;
2092 caller_data = oap->oap_caller_data;
2094 pga[i] = &oap->oap_brw_page;
2095 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2096 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2097 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2101 /* always get the data for the obdo for the rpc */
2102 LASSERT(ops != NULL);
2103 ops->ap_fill_obdo(caller_data, cmd, oa);
2104 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2106 sort_brw_pages(pga, page_count);
2107 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2111 CERROR("prep_req failed: %d\n", rc);
2112 GOTO(out, req = ERR_PTR(rc));
2115 /* Need to update the timestamps after the request is built in case
2116 * we race with setattr (locally or in queue at OST). If OST gets
2117 * later setattr before earlier BRW (as determined by the request xid),
2118 * the OST will not use BRW timestamps. Sadly, there is no obvious
2119 * way to do this in a single call. bug 10150 */
2120 ops->ap_update_obdo(caller_data, cmd, oa,
2121 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2123 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2124 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2125 INIT_LIST_HEAD(&aa->aa_oaps);
2126 list_splice(rpc_list, &aa->aa_oaps);
2127 INIT_LIST_HEAD(rpc_list);
2134 OBD_FREE(pga, sizeof(*pga) * page_count);
2139 /* the loi lock is held across this function but it's allowed to release
2140 * and reacquire it during its work */
2141 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2142 int cmd, struct loi_oap_pages *lop)
2144 struct ptlrpc_request *req;
2145 obd_count page_count = 0;
2146 struct osc_async_page *oap = NULL, *tmp;
2147 struct osc_brw_async_args *aa;
2148 struct obd_async_page_ops *ops;
2149 CFS_LIST_HEAD(rpc_list);
2150 unsigned int ending_offset;
2151 unsigned starting_offset = 0;
2154 /* first we find the pages we're allowed to work with */
2155 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2157 ops = oap->oap_caller_ops;
2159 LASSERT(oap->oap_magic == OAP_MAGIC);
2161 /* in llite being 'ready' equates to the page being locked
2162 * until completion unlocks it. commit_write submits a page
2163 * as not ready because its unlock will happen unconditionally
2164 * as the call returns. if we race with commit_write giving
2165 * us that page we dont' want to create a hole in the page
2166 * stream, so we stop and leave the rpc to be fired by
2167 * another dirtier or kupdated interval (the not ready page
2168 * will still be on the dirty list). we could call in
2169 * at the end of ll_file_write to process the queue again. */
2170 if (!(oap->oap_async_flags & ASYNC_READY)) {
2171 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2173 CDEBUG(D_INODE, "oap %p page %p returned %d "
2174 "instead of ready\n", oap,
2178 /* llite is telling us that the page is still
2179 * in commit_write and that we should try
2180 * and put it in an rpc again later. we
2181 * break out of the loop so we don't create
2182 * a hole in the sequence of pages in the rpc
2187 /* the io isn't needed.. tell the checks
2188 * below to complete the rpc with EINTR */
2189 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2190 oap->oap_count = -EINTR;
2193 oap->oap_async_flags |= ASYNC_READY;
2196 LASSERTF(0, "oap %p page %p returned %d "
2197 "from make_ready\n", oap,
2205 * Page submitted for IO has to be locked. Either by
2206 * ->ap_make_ready() or by higher layers.
2208 * XXX nikita: this assertion should be adjusted when lustre
2209 * starts using PG_writeback for pages being written out.
2211 #if defined(__KERNEL__) && defined(__linux__)
2212 LASSERT(PageLocked(oap->oap_page));
2214 /* If there is a gap at the start of this page, it can't merge
2215 * with any previous page, so we'll hand the network a
2216 * "fragmented" page array that it can't transfer in 1 RDMA */
2217 if (page_count != 0 && oap->oap_page_off != 0)
2220 /* take the page out of our book-keeping */
2221 list_del_init(&oap->oap_pending_item);
2222 lop_update_pending(cli, lop, cmd, -1);
2223 list_del_init(&oap->oap_urgent_item);
2225 if (page_count == 0)
2226 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2227 (PTLRPC_MAX_BRW_SIZE - 1);
2229 /* ask the caller for the size of the io as the rpc leaves. */
2230 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2232 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2233 if (oap->oap_count <= 0) {
2234 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2236 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2240 /* now put the page back in our accounting */
2241 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2242 if (++page_count >= cli->cl_max_pages_per_rpc)
2245 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2246 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2247 * have the same alignment as the initial writes that allocated
2248 * extents on the server. */
2249 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2250 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2251 if (ending_offset == 0)
2254 /* If there is a gap at the end of this page, it can't merge
2255 * with any subsequent pages, so we'll hand the network a
2256 * "fragmented" page array that it can't transfer in 1 RDMA */
2257 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2261 osc_wake_cache_waiters(cli);
2263 if (page_count == 0)
2266 loi_list_maint(cli, loi);
2268 client_obd_list_unlock(&cli->cl_loi_list_lock);
2270 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2272 /* this should happen rarely and is pretty bad, it makes the
2273 * pending list not follow the dirty order */
2274 client_obd_list_lock(&cli->cl_loi_list_lock);
2275 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2276 list_del_init(&oap->oap_rpc_item);
2278 /* queued sync pages can be torn down while the pages
2279 * were between the pending list and the rpc */
2280 if (oap->oap_interrupted) {
2281 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2282 osc_ap_completion(cli, NULL, oap, 0,
2286 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2288 loi_list_maint(cli, loi);
2289 RETURN(PTR_ERR(req));
2292 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2294 if (cmd == OBD_BRW_READ) {
2295 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2296 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2297 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2298 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2299 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2301 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2302 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2303 cli->cl_w_in_flight);
2304 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2305 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2306 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2309 client_obd_list_lock(&cli->cl_loi_list_lock);
2311 if (cmd == OBD_BRW_READ)
2312 cli->cl_r_in_flight++;
2314 cli->cl_w_in_flight++;
2316 /* queued sync pages can be torn down while the pages
2317 * were between the pending list and the rpc */
2319 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2320 /* only one oap gets a request reference */
2323 if (oap->oap_interrupted && !req->rq_intr) {
2324 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2326 ptlrpc_mark_interrupted(req);
2330 tmp->oap_request = ptlrpc_request_addref(req);
2332 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2333 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2335 req->rq_interpret_reply = brw_interpret_oap;
2336 ptlrpcd_add_req(req);
2340 #define LOI_DEBUG(LOI, STR, args...) \
2341 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2342 !list_empty(&(LOI)->loi_cli_item), \
2343 (LOI)->loi_write_lop.lop_num_pending, \
2344 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2345 (LOI)->loi_read_lop.lop_num_pending, \
2346 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2349 /* This is called by osc_check_rpcs() to find which objects have pages that
2350 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2351 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2354 /* first return all objects which we already know to have
2355 * pages ready to be stuffed into rpcs */
2356 if (!list_empty(&cli->cl_loi_ready_list))
2357 RETURN(list_entry(cli->cl_loi_ready_list.next,
2358 struct lov_oinfo, loi_cli_item));
2360 /* then if we have cache waiters, return all objects with queued
2361 * writes. This is especially important when many small files
2362 * have filled up the cache and not been fired into rpcs because
2363 * they don't pass the nr_pending/object threshhold */
2364 if (!list_empty(&cli->cl_cache_waiters) &&
2365 !list_empty(&cli->cl_loi_write_list))
2366 RETURN(list_entry(cli->cl_loi_write_list.next,
2367 struct lov_oinfo, loi_write_item));
2369 /* then return all queued objects when we have an invalid import
2370 * so that they get flushed */
2371 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2372 if (!list_empty(&cli->cl_loi_write_list))
2373 RETURN(list_entry(cli->cl_loi_write_list.next,
2374 struct lov_oinfo, loi_write_item));
2375 if (!list_empty(&cli->cl_loi_read_list))
2376 RETURN(list_entry(cli->cl_loi_read_list.next,
2377 struct lov_oinfo, loi_read_item));
2382 /* called with the loi list lock held */
2383 static void osc_check_rpcs(struct client_obd *cli)
2385 struct lov_oinfo *loi;
2386 int rc = 0, race_counter = 0;
2389 while ((loi = osc_next_loi(cli)) != NULL) {
2390 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2392 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2395 /* attempt some read/write balancing by alternating between
2396 * reads and writes in an object. The makes_rpc checks here
2397 * would be redundant if we were getting read/write work items
2398 * instead of objects. we don't want send_oap_rpc to drain a
2399 * partial read pending queue when we're given this object to
2400 * do io on writes while there are cache waiters */
2401 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2402 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2403 &loi->loi_write_lop);
2411 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2412 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2413 &loi->loi_read_lop);
2422 /* attempt some inter-object balancing by issueing rpcs
2423 * for each object in turn */
2424 if (!list_empty(&loi->loi_cli_item))
2425 list_del_init(&loi->loi_cli_item);
2426 if (!list_empty(&loi->loi_write_item))
2427 list_del_init(&loi->loi_write_item);
2428 if (!list_empty(&loi->loi_read_item))
2429 list_del_init(&loi->loi_read_item);
2431 loi_list_maint(cli, loi);
2433 /* send_oap_rpc fails with 0 when make_ready tells it to
2434 * back off. llite's make_ready does this when it tries
2435 * to lock a page queued for write that is already locked.
2436 * we want to try sending rpcs from many objects, but we
2437 * don't want to spin failing with 0. */
2438 if (race_counter == 10)
2444 /* we're trying to queue a page in the osc so we're subject to the
2445 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2446 * If the osc's queued pages are already at that limit, then we want to sleep
2447 * until there is space in the osc's queue for us. We also may be waiting for
2448 * write credits from the OST if there are RPCs in flight that may return some
2449 * before we fall back to sync writes.
2451 * We need this know our allocation was granted in the presence of signals */
2452 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2456 client_obd_list_lock(&cli->cl_loi_list_lock);
2457 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2458 client_obd_list_unlock(&cli->cl_loi_list_lock);
2462 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2463 * grant or cache space. */
2464 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2465 struct osc_async_page *oap)
2467 struct osc_cache_waiter ocw;
2468 struct l_wait_info lwi = { 0 };
2472 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2473 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2474 cli->cl_dirty_max, obd_max_dirty_pages,
2475 cli->cl_lost_grant, cli->cl_avail_grant);
2477 /* force the caller to try sync io. this can jump the list
2478 * of queued writes and create a discontiguous rpc stream */
2479 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2480 loi->loi_ar.ar_force_sync)
2483 /* Hopefully normal case - cache space and write credits available */
2484 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2485 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2486 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2487 /* account for ourselves */
2488 osc_consume_write_grant(cli, &oap->oap_brw_page);
2492 /* Make sure that there are write rpcs in flight to wait for. This
2493 * is a little silly as this object may not have any pending but
2494 * other objects sure might. */
2495 if (cli->cl_w_in_flight) {
2496 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2497 cfs_waitq_init(&ocw.ocw_waitq);
2501 loi_list_maint(cli, loi);
2502 osc_check_rpcs(cli);
2503 client_obd_list_unlock(&cli->cl_loi_list_lock);
2505 CDEBUG(D_CACHE, "sleeping for cache space\n");
2506 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2508 client_obd_list_lock(&cli->cl_loi_list_lock);
2509 if (!list_empty(&ocw.ocw_entry)) {
2510 list_del(&ocw.ocw_entry);
2519 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2520 struct lov_oinfo *loi, cfs_page_t *page,
2521 obd_off offset, struct obd_async_page_ops *ops,
2522 void *data, void **res)
2524 struct osc_async_page *oap;
2528 return size_round(sizeof(*oap));
2531 oap->oap_magic = OAP_MAGIC;
2532 oap->oap_cli = &exp->exp_obd->u.cli;
2535 oap->oap_caller_ops = ops;
2536 oap->oap_caller_data = data;
2538 oap->oap_page = page;
2539 oap->oap_obj_off = offset;
2541 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2542 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2543 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2545 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2547 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2551 struct osc_async_page *oap_from_cookie(void *cookie)
2553 struct osc_async_page *oap = cookie;
2554 if (oap->oap_magic != OAP_MAGIC)
2555 return ERR_PTR(-EINVAL);
2559 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2560 struct lov_oinfo *loi, void *cookie,
2561 int cmd, obd_off off, int count,
2562 obd_flag brw_flags, enum async_flags async_flags)
2564 struct client_obd *cli = &exp->exp_obd->u.cli;
2565 struct osc_async_page *oap;
2569 oap = oap_from_cookie(cookie);
2571 RETURN(PTR_ERR(oap));
2573 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2576 if (!list_empty(&oap->oap_pending_item) ||
2577 !list_empty(&oap->oap_urgent_item) ||
2578 !list_empty(&oap->oap_rpc_item))
2581 /* check if the file's owner/group is over quota */
2582 #ifdef HAVE_QUOTA_SUPPORT
2583 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2584 struct obd_async_page_ops *ops;
2591 ops = oap->oap_caller_ops;
2592 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2593 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2604 loi = lsm->lsm_oinfo[0];
2606 client_obd_list_lock(&cli->cl_loi_list_lock);
2609 oap->oap_page_off = off;
2610 oap->oap_count = count;
2611 oap->oap_brw_flags = brw_flags;
2612 oap->oap_async_flags = async_flags;
2614 if (cmd & OBD_BRW_WRITE) {
2615 rc = osc_enter_cache(cli, loi, oap);
2617 client_obd_list_unlock(&cli->cl_loi_list_lock);
2622 osc_oap_to_pending(oap);
2623 loi_list_maint(cli, loi);
2625 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2628 osc_check_rpcs(cli);
2629 client_obd_list_unlock(&cli->cl_loi_list_lock);
2634 /* aka (~was & now & flag), but this is more clear :) */
2635 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2637 static int osc_set_async_flags(struct obd_export *exp,
2638 struct lov_stripe_md *lsm,
2639 struct lov_oinfo *loi, void *cookie,
2640 obd_flag async_flags)
2642 struct client_obd *cli = &exp->exp_obd->u.cli;
2643 struct loi_oap_pages *lop;
2644 struct osc_async_page *oap;
2648 oap = oap_from_cookie(cookie);
2650 RETURN(PTR_ERR(oap));
2653 * bug 7311: OST-side locking is only supported for liblustre for now
2654 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2655 * implementation has to handle case where OST-locked page was picked
2656 * up by, e.g., ->writepage().
2658 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2659 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2662 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2666 loi = lsm->lsm_oinfo[0];
2668 if (oap->oap_cmd & OBD_BRW_WRITE) {
2669 lop = &loi->loi_write_lop;
2671 lop = &loi->loi_read_lop;
2674 client_obd_list_lock(&cli->cl_loi_list_lock);
2676 if (list_empty(&oap->oap_pending_item))
2677 GOTO(out, rc = -EINVAL);
2679 if ((oap->oap_async_flags & async_flags) == async_flags)
2682 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2683 oap->oap_async_flags |= ASYNC_READY;
2685 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2686 if (list_empty(&oap->oap_rpc_item)) {
2687 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2688 loi_list_maint(cli, loi);
2692 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2693 oap->oap_async_flags);
2695 osc_check_rpcs(cli);
2696 client_obd_list_unlock(&cli->cl_loi_list_lock);
2700 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2701 struct lov_oinfo *loi,
2702 struct obd_io_group *oig, void *cookie,
2703 int cmd, obd_off off, int count,
2705 obd_flag async_flags)
2707 struct client_obd *cli = &exp->exp_obd->u.cli;
2708 struct osc_async_page *oap;
2709 struct loi_oap_pages *lop;
2713 oap = oap_from_cookie(cookie);
2715 RETURN(PTR_ERR(oap));
2717 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2720 if (!list_empty(&oap->oap_pending_item) ||
2721 !list_empty(&oap->oap_urgent_item) ||
2722 !list_empty(&oap->oap_rpc_item))
2726 loi = lsm->lsm_oinfo[0];
2728 client_obd_list_lock(&cli->cl_loi_list_lock);
2731 oap->oap_page_off = off;
2732 oap->oap_count = count;
2733 oap->oap_brw_flags = brw_flags;
2734 oap->oap_async_flags = async_flags;
2736 if (cmd & OBD_BRW_WRITE)
2737 lop = &loi->loi_write_lop;
2739 lop = &loi->loi_read_lop;
2741 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2742 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2744 rc = oig_add_one(oig, &oap->oap_occ);
2747 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2748 oap, oap->oap_page, rc);
2750 client_obd_list_unlock(&cli->cl_loi_list_lock);
2755 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2756 struct loi_oap_pages *lop, int cmd)
2758 struct list_head *pos, *tmp;
2759 struct osc_async_page *oap;
2761 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2762 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2763 list_del(&oap->oap_pending_item);
2764 osc_oap_to_pending(oap);
2766 loi_list_maint(cli, loi);
2769 static int osc_trigger_group_io(struct obd_export *exp,
2770 struct lov_stripe_md *lsm,
2771 struct lov_oinfo *loi,
2772 struct obd_io_group *oig)
2774 struct client_obd *cli = &exp->exp_obd->u.cli;
2778 loi = lsm->lsm_oinfo[0];
2780 client_obd_list_lock(&cli->cl_loi_list_lock);
2782 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2783 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2785 osc_check_rpcs(cli);
2786 client_obd_list_unlock(&cli->cl_loi_list_lock);
2791 static int osc_teardown_async_page(struct obd_export *exp,
2792 struct lov_stripe_md *lsm,
2793 struct lov_oinfo *loi, void *cookie)
2795 struct client_obd *cli = &exp->exp_obd->u.cli;
2796 struct loi_oap_pages *lop;
2797 struct osc_async_page *oap;
2801 oap = oap_from_cookie(cookie);
2803 RETURN(PTR_ERR(oap));
2806 loi = lsm->lsm_oinfo[0];
2808 if (oap->oap_cmd & OBD_BRW_WRITE) {
2809 lop = &loi->loi_write_lop;
2811 lop = &loi->loi_read_lop;
2814 client_obd_list_lock(&cli->cl_loi_list_lock);
2816 if (!list_empty(&oap->oap_rpc_item))
2817 GOTO(out, rc = -EBUSY);
2819 osc_exit_cache(cli, oap, 0);
2820 osc_wake_cache_waiters(cli);
2822 if (!list_empty(&oap->oap_urgent_item)) {
2823 list_del_init(&oap->oap_urgent_item);
2824 oap->oap_async_flags &= ~ASYNC_URGENT;
2826 if (!list_empty(&oap->oap_pending_item)) {
2827 list_del_init(&oap->oap_pending_item);
2828 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2830 loi_list_maint(cli, loi);
2832 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2834 client_obd_list_unlock(&cli->cl_loi_list_lock);
2838 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2841 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2844 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2847 lock_res_and_lock(lock);
2848 #if defined (__KERNEL__) && defined (__linux__)
2849 /* Liang XXX: Darwin and Winnt checking should be added */
2850 if (lock->l_ast_data && lock->l_ast_data != data) {
2851 struct inode *new_inode = data;
2852 struct inode *old_inode = lock->l_ast_data;
2853 if (!(old_inode->i_state & I_FREEING))
2854 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2855 LASSERTF(old_inode->i_state & I_FREEING,
2856 "Found existing inode %p/%lu/%u state %lu in lock: "
2857 "setting data to %p/%lu/%u\n", old_inode,
2858 old_inode->i_ino, old_inode->i_generation,
2860 new_inode, new_inode->i_ino, new_inode->i_generation);
2863 lock->l_ast_data = data;
2864 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2865 unlock_res_and_lock(lock);
2866 LDLM_LOCK_PUT(lock);
2869 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2870 ldlm_iterator_t replace, void *data)
2872 struct ldlm_res_id res_id = { .name = {0} };
2873 struct obd_device *obd = class_exp2obd(exp);
2875 res_id.name[0] = lsm->lsm_object_id;
2876 res_id.name[2] = lsm->lsm_object_gr;
2878 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2882 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2888 /* The request was created before ldlm_cli_enqueue call. */
2889 if (rc == ELDLM_LOCK_ABORTED) {
2890 struct ldlm_reply *rep;
2891 rep = req_capsule_server_get(&req->rq_pill,
2894 LASSERT(rep != NULL);
2895 if (rep->lock_policy_res1)
2896 rc = rep->lock_policy_res1;
2900 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2901 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2902 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2903 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2904 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2907 /* Call the update callback. */
2908 rc = oinfo->oi_cb_up(oinfo, rc);
2912 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2913 struct osc_enqueue_args *aa, int rc)
2915 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2916 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2917 struct ldlm_lock *lock;
2919 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2921 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2923 /* Complete obtaining the lock procedure. */
2924 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2926 &aa->oa_oi->oi_flags,
2927 &lsm->lsm_oinfo[0]->loi_lvb,
2928 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2929 lustre_swab_ost_lvb,
2930 aa->oa_oi->oi_lockh, rc);
2932 /* Complete osc stuff. */
2933 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2935 /* Release the lock for async request. */
2936 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2937 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2939 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2940 aa->oa_oi->oi_lockh, req, aa);
2941 LDLM_LOCK_PUT(lock);
2945 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2946 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2947 * other synchronous requests, however keeping some locks and trying to obtain
2948 * others may take a considerable amount of time in a case of ost failure; and
2949 * when other sync requests do not get released lock from a client, the client
2950 * is excluded from the cluster -- such scenarious make the life difficult, so
2951 * release locks just after they are obtained. */
2952 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2953 struct ldlm_enqueue_info *einfo,
2954 struct ptlrpc_request_set *rqset)
2956 struct ldlm_res_id res_id = { .name = {0} };
2957 struct obd_device *obd = exp->exp_obd;
2958 struct ptlrpc_request *req = NULL;
2959 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2964 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2965 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2967 /* Filesystem lock extents are extended to page boundaries so that
2968 * dealing with the page cache is a little smoother. */
2969 oinfo->oi_policy.l_extent.start -=
2970 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2971 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2973 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2976 /* Next, search for already existing extent locks that will cover us */
2977 /* If we're trying to read, we also search for an existing PW lock. The
2978 * VFS and page cache already protect us locally, so lots of readers/
2979 * writers can share a single PW lock.
2981 * There are problems with conversion deadlocks, so instead of
2982 * converting a read lock to a write lock, we'll just enqueue a new
2985 * At some point we should cancel the read lock instead of making them
2986 * send us a blocking callback, but there are problems with canceling
2987 * locks out from other users right now, too. */
2988 mode = einfo->ei_mode;
2989 if (einfo->ei_mode == LCK_PR)
2991 mode = ldlm_lock_match(obd->obd_namespace,
2992 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2993 einfo->ei_type, &oinfo->oi_policy, mode,
2996 /* addref the lock only if not async requests and PW lock is
2997 * matched whereas we asked for PR. */
2998 if (!rqset && einfo->ei_mode != mode)
2999 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3000 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3003 /* I would like to be able to ASSERT here that rss <=
3004 * kms, but I can't, for reasons which are explained in
3008 /* We already have a lock, and it's referenced */
3009 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3011 /* For async requests, decref the lock. */
3012 if (einfo->ei_mode != mode)
3013 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3015 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3022 CFS_LIST_HEAD(cancels);
3023 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3024 &RQF_LDLM_ENQUEUE_LVB);
3028 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3032 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3033 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3034 ptlrpc_request_set_replen(req);
3037 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3038 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3040 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3041 &oinfo->oi_policy, &oinfo->oi_flags,
3042 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3043 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3044 lustre_swab_ost_lvb, oinfo->oi_lockh,
3048 struct osc_enqueue_args *aa;
3049 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3050 aa = (struct osc_enqueue_args *)&req->rq_async_args;
3055 req->rq_interpret_reply = osc_enqueue_interpret;
3056 ptlrpc_set_add_req(rqset, req);
3057 } else if (intent) {
3058 ptlrpc_req_finished(req);
3063 rc = osc_enqueue_fini(req, oinfo, intent, rc);
3065 ptlrpc_req_finished(req);
3070 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3071 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3072 int *flags, void *data, struct lustre_handle *lockh)
3074 struct ldlm_res_id res_id = { .name = {0} };
3075 struct obd_device *obd = exp->exp_obd;
3076 int lflags = *flags;
3080 res_id.name[0] = lsm->lsm_object_id;
3081 res_id.name[2] = lsm->lsm_object_gr;
3083 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3086 /* Filesystem lock extents are extended to page boundaries so that
3087 * dealing with the page cache is a little smoother */
3088 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3089 policy->l_extent.end |= ~CFS_PAGE_MASK;
3091 /* Next, search for already existing extent locks that will cover us */
3092 /* If we're trying to read, we also search for an existing PW lock. The
3093 * VFS and page cache already protect us locally, so lots of readers/
3094 * writers can share a single PW lock. */
3098 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3099 &res_id, type, policy, rc, lockh);
3101 osc_set_data_with_check(lockh, data, lflags);
3102 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3103 ldlm_lock_addref(lockh, LCK_PR);
3104 ldlm_lock_decref(lockh, LCK_PW);
3111 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3112 __u32 mode, struct lustre_handle *lockh)
3116 if (unlikely(mode == LCK_GROUP))
3117 ldlm_lock_decref_and_cancel(lockh, mode);
3119 ldlm_lock_decref(lockh, mode);
3124 static int osc_cancel_unused(struct obd_export *exp,
3125 struct lov_stripe_md *lsm, int flags,
3128 struct obd_device *obd = class_exp2obd(exp);
3129 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3132 res_id.name[0] = lsm->lsm_object_id;
3133 res_id.name[2] = lsm->lsm_object_gr;
3137 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3140 static int osc_join_lru(struct obd_export *exp,
3141 struct lov_stripe_md *lsm, int join)
3143 struct obd_device *obd = class_exp2obd(exp);
3144 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3147 res_id.name[0] = lsm->lsm_object_id;
3148 res_id.name[2] = lsm->lsm_object_gr;
3152 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3155 static int osc_statfs_interpret(struct ptlrpc_request *req,
3156 struct osc_async_args *aa, int rc)
3158 struct obd_statfs *msfs;
3164 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3166 GOTO(out, rc = -EPROTO);
3169 *aa->aa_oi->oi_osfs = *msfs;
3171 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3175 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3176 __u64 max_age, struct ptlrpc_request_set *rqset)
3178 struct ptlrpc_request *req;
3179 struct osc_async_args *aa;
3183 /* We could possibly pass max_age in the request (as an absolute
3184 * timestamp or a "seconds.usec ago") so the target can avoid doing
3185 * extra calls into the filesystem if that isn't necessary (e.g.
3186 * during mount that would help a bit). Having relative timestamps
3187 * is not so great if request processing is slow, while absolute
3188 * timestamps are not ideal because they need time synchronization. */
3189 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3193 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3195 ptlrpc_request_free(req);
3198 ptlrpc_request_set_replen(req);
3199 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3201 req->rq_interpret_reply = osc_statfs_interpret;
3202 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3203 aa = (struct osc_async_args *)&req->rq_async_args;
3206 ptlrpc_set_add_req(rqset, req);
3210 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3213 struct obd_statfs *msfs;
3214 struct ptlrpc_request *req;
3218 /* We could possibly pass max_age in the request (as an absolute
3219 * timestamp or a "seconds.usec ago") so the target can avoid doing
3220 * extra calls into the filesystem if that isn't necessary (e.g.
3221 * during mount that would help a bit). Having relative timestamps
3222 * is not so great if request processing is slow, while absolute
3223 * timestamps are not ideal because they need time synchronization. */
3224 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3228 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3230 ptlrpc_request_free(req);
3233 ptlrpc_request_set_replen(req);
3234 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3236 rc = ptlrpc_queue_wait(req);
3240 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3242 GOTO(out, rc = -EPROTO);
3249 ptlrpc_req_finished(req);
3253 /* Retrieve object striping information.
3255 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3256 * the maximum number of OST indices which will fit in the user buffer.
3257 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3259 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3261 struct lov_user_md lum, *lumk;
3262 int rc = 0, lum_size;
3268 if (copy_from_user(&lum, lump, sizeof(lum)))
3271 if (lum.lmm_magic != LOV_USER_MAGIC)
3274 if (lum.lmm_stripe_count > 0) {
3275 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3276 OBD_ALLOC(lumk, lum_size);
3280 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3281 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3283 lum_size = sizeof(lum);
3287 lumk->lmm_object_id = lsm->lsm_object_id;
3288 lumk->lmm_object_gr = lsm->lsm_object_gr;
3289 lumk->lmm_stripe_count = 1;
3291 if (copy_to_user(lump, lumk, lum_size))
3295 OBD_FREE(lumk, lum_size);
3301 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3302 void *karg, void *uarg)
3304 struct obd_device *obd = exp->exp_obd;
3305 struct obd_ioctl_data *data = karg;
3309 if (!try_module_get(THIS_MODULE)) {
3310 CERROR("Can't get module. Is it alive?");
3314 case OBD_IOC_LOV_GET_CONFIG: {
3316 struct lov_desc *desc;
3317 struct obd_uuid uuid;
3321 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3322 GOTO(out, err = -EINVAL);
3324 data = (struct obd_ioctl_data *)buf;
3326 if (sizeof(*desc) > data->ioc_inllen1) {
3327 obd_ioctl_freedata(buf, len);
3328 GOTO(out, err = -EINVAL);
3331 if (data->ioc_inllen2 < sizeof(uuid)) {
3332 obd_ioctl_freedata(buf, len);
3333 GOTO(out, err = -EINVAL);
3336 desc = (struct lov_desc *)data->ioc_inlbuf1;
3337 desc->ld_tgt_count = 1;
3338 desc->ld_active_tgt_count = 1;
3339 desc->ld_default_stripe_count = 1;
3340 desc->ld_default_stripe_size = 0;
3341 desc->ld_default_stripe_offset = 0;
3342 desc->ld_pattern = 0;
3343 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3345 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3347 err = copy_to_user((void *)uarg, buf, len);
3350 obd_ioctl_freedata(buf, len);
3353 case LL_IOC_LOV_SETSTRIPE:
3354 err = obd_alloc_memmd(exp, karg);
3358 case LL_IOC_LOV_GETSTRIPE:
3359 err = osc_getstripe(karg, uarg);
3361 case OBD_IOC_CLIENT_RECOVER:
3362 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3367 case IOC_OSC_SET_ACTIVE:
3368 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3371 case OBD_IOC_POLL_QUOTACHECK:
3372 err = lquota_poll_check(quota_interface, exp,
3373 (struct if_quotacheck *)karg);
3376 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3377 cmd, cfs_curproc_comm());
3378 GOTO(out, err = -ENOTTY);
3381 module_put(THIS_MODULE);
3385 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3386 void *key, __u32 *vallen, void *val)
3389 if (!vallen || !val)
3392 if (KEY_IS("lock_to_stripe")) {
3393 __u32 *stripe = val;
3394 *vallen = sizeof(*stripe);
3397 } else if (KEY_IS("last_id")) {
3398 struct ptlrpc_request *req;
3403 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3408 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3409 RCL_CLIENT, keylen);
3410 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3412 ptlrpc_request_free(req);
3416 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3418 memcpy(tmp, key, keylen);
3420 req_capsule_set_size(&req->rq_pill, &RMF_OBD_ID,
3421 RCL_SERVER, *vallen);
3422 ptlrpc_request_set_replen(req);
3423 rc = ptlrpc_queue_wait(req);
3427 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3429 GOTO(out, rc = -EPROTO);
3431 *((obd_id *)val) = *reply;
3433 ptlrpc_req_finished(req);
3439 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3442 struct llog_ctxt *ctxt;
3443 struct obd_import *imp = req->rq_import;
3449 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3452 rc = llog_initiator_connect(ctxt);
3454 CERROR("cannot establish connection for "
3455 "ctxt %p: %d\n", ctxt, rc);
3458 spin_lock(&imp->imp_lock);
3459 imp->imp_server_timeout = 1;
3460 imp->imp_pingable = 1;
3461 spin_unlock(&imp->imp_lock);
3462 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3467 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3468 void *key, obd_count vallen, void *val,
3469 struct ptlrpc_request_set *set)
3471 struct ptlrpc_request *req;
3472 struct obd_device *obd = exp->exp_obd;
3473 struct obd_import *imp = class_exp2cliimp(exp);
3478 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3480 if (KEY_IS(KEY_NEXT_ID)) {
3481 if (vallen != sizeof(obd_id))
3483 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3484 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3485 exp->exp_obd->obd_name,
3486 obd->u.cli.cl_oscc.oscc_next_id);
3491 if (KEY_IS("unlinked")) {
3492 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3493 spin_lock(&oscc->oscc_lock);
3494 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3495 spin_unlock(&oscc->oscc_lock);
3499 if (KEY_IS(KEY_INIT_RECOV)) {
3500 if (vallen != sizeof(int))
3502 spin_lock(&imp->imp_lock);
3503 imp->imp_initial_recov = *(int *)val;
3504 spin_unlock(&imp->imp_lock);
3505 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3506 exp->exp_obd->obd_name,
3507 imp->imp_initial_recov);
3511 if (KEY_IS("checksum")) {
3512 if (vallen != sizeof(int))
3514 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3518 if (KEY_IS(KEY_FLUSH_CTX)) {
3519 sptlrpc_import_flush_my_ctx(imp);
3526 /* We pass all other commands directly to OST. Since nobody calls osc
3527 methods directly and everybody is supposed to go through LOV, we
3528 assume lov checked invalid values for us.
3529 The only recognised values so far are evict_by_nid and mds_conn.
3530 Even if something bad goes through, we'd get a -EINVAL from OST
3534 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3538 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3539 RCL_CLIENT, keylen);
3540 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3541 RCL_CLIENT, vallen);
3542 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3544 ptlrpc_request_free(req);
3548 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3549 memcpy(tmp, key, keylen);
3550 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3551 memcpy(tmp, val, vallen);
3553 if (KEY_IS(KEY_MDS_CONN)) {
3554 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3556 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3557 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3558 LASSERT(oscc->oscc_oa.o_gr > 0);
3559 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3562 ptlrpc_request_set_replen(req);
3563 ptlrpc_set_add_req(set, req);
3564 ptlrpc_check_set(set);
3570 static struct llog_operations osc_size_repl_logops = {
3571 lop_cancel: llog_obd_repl_cancel
3574 static struct llog_operations osc_mds_ost_orig_logops;
3575 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3576 struct obd_device *tgt, int count,
3577 struct llog_catid *catid, struct obd_uuid *uuid)
3582 spin_lock(&obd->obd_dev_lock);
3583 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3584 osc_mds_ost_orig_logops = llog_lvfs_ops;
3585 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3586 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3587 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3588 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3590 spin_unlock(&obd->obd_dev_lock);
3592 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3593 &catid->lci_logid, &osc_mds_ost_orig_logops);
3595 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3599 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3600 &osc_size_repl_logops);
3602 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3605 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3606 obd->obd_name, tgt->obd_name, count, catid, rc);
3607 CERROR("logid "LPX64":0x%x\n",
3608 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3613 static int osc_llog_finish(struct obd_device *obd, int count)
3615 struct llog_ctxt *ctxt;
3616 int rc = 0, rc2 = 0;
3619 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3621 rc = llog_cleanup(ctxt);
3623 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3625 rc2 = llog_cleanup(ctxt);
3632 static int osc_reconnect(const struct lu_env *env,
3633 struct obd_export *exp, struct obd_device *obd,
3634 struct obd_uuid *cluuid,
3635 struct obd_connect_data *data)
3637 struct client_obd *cli = &obd->u.cli;
3639 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3642 client_obd_list_lock(&cli->cl_loi_list_lock);
3643 data->ocd_grant = cli->cl_avail_grant ?:
3644 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3645 lost_grant = cli->cl_lost_grant;
3646 cli->cl_lost_grant = 0;
3647 client_obd_list_unlock(&cli->cl_loi_list_lock);
3649 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3650 "cl_lost_grant: %ld\n", data->ocd_grant,
3651 cli->cl_avail_grant, lost_grant);
3652 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3653 " ocd_grant: %d\n", data->ocd_connect_flags,
3654 data->ocd_version, data->ocd_grant);
3660 static int osc_disconnect(struct obd_export *exp)
3662 struct obd_device *obd = class_exp2obd(exp);
3663 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3666 if (obd->u.cli.cl_conn_count == 1)
3667 /* flush any remaining cancel messages out to the target */
3668 llog_sync(ctxt, exp);
3670 rc = client_disconnect_export(exp);
3674 static int osc_import_event(struct obd_device *obd,
3675 struct obd_import *imp,
3676 enum obd_import_event event)
3678 struct client_obd *cli;
3682 LASSERT(imp->imp_obd == obd);
3685 case IMP_EVENT_DISCON: {
3686 /* Only do this on the MDS OSC's */
3687 if (imp->imp_server_timeout) {
3688 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3690 spin_lock(&oscc->oscc_lock);
3691 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3692 spin_unlock(&oscc->oscc_lock);
3695 client_obd_list_lock(&cli->cl_loi_list_lock);
3696 cli->cl_avail_grant = 0;
3697 cli->cl_lost_grant = 0;
3698 client_obd_list_unlock(&cli->cl_loi_list_lock);
3701 case IMP_EVENT_INACTIVE: {
3702 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3705 case IMP_EVENT_INVALIDATE: {
3706 struct ldlm_namespace *ns = obd->obd_namespace;
3710 client_obd_list_lock(&cli->cl_loi_list_lock);
3711 /* all pages go to failing rpcs due to the invalid import */
3712 osc_check_rpcs(cli);
3713 client_obd_list_unlock(&cli->cl_loi_list_lock);
3715 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3719 case IMP_EVENT_ACTIVE: {
3720 /* Only do this on the MDS OSC's */
3721 if (imp->imp_server_timeout) {
3722 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3724 spin_lock(&oscc->oscc_lock);
3725 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3726 spin_unlock(&oscc->oscc_lock);
3728 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3731 case IMP_EVENT_OCD: {
3732 struct obd_connect_data *ocd = &imp->imp_connect_data;
3734 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3735 osc_init_grant(&obd->u.cli, ocd);
3738 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3739 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3741 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3745 CERROR("Unknown import event %d\n", event);
3751 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3757 rc = ptlrpcd_addref();
3761 rc = client_obd_setup(obd, lcfg);
3765 struct lprocfs_static_vars lvars = { 0 };
3766 struct client_obd *cli = &obd->u.cli;
3768 lprocfs_osc_init_vars(&lvars);
3769 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3770 lproc_osc_attach_seqstat(obd);
3771 sptlrpc_lprocfs_cliobd_attach(obd);
3772 ptlrpc_lprocfs_register_obd(obd);
3776 /* We need to allocate a few requests more, because
3777 brw_interpret_oap tries to create new requests before freeing
3778 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3779 reserved, but I afraid that might be too much wasted RAM
3780 in fact, so 2 is just my guess and still should work. */
3781 cli->cl_import->imp_rq_pool =
3782 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3784 ptlrpc_add_rqs_to_pool);
3790 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3796 case OBD_CLEANUP_EARLY: {
3797 struct obd_import *imp;
3798 imp = obd->u.cli.cl_import;
3799 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3800 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3801 ptlrpc_deactivate_import(imp);
3802 spin_lock(&imp->imp_lock);
3803 imp->imp_pingable = 0;
3804 spin_unlock(&imp->imp_lock);
3807 case OBD_CLEANUP_EXPORTS: {
3808 /* If we set up but never connected, the
3809 client import will not have been cleaned. */
3810 if (obd->u.cli.cl_import) {
3811 struct obd_import *imp;
3812 imp = obd->u.cli.cl_import;
3813 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3815 ptlrpc_invalidate_import(imp);
3816 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3817 class_destroy_import(imp);
3818 obd->u.cli.cl_import = NULL;
3822 case OBD_CLEANUP_SELF_EXP:
3823 rc = obd_llog_finish(obd, 0);
3825 CERROR("failed to cleanup llogging subsystems\n");
3827 case OBD_CLEANUP_OBD:
3833 int osc_cleanup(struct obd_device *obd)
3835 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3839 ptlrpc_lprocfs_unregister_obd(obd);
3840 lprocfs_obd_cleanup(obd);
3842 spin_lock(&oscc->oscc_lock);
3843 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3844 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3845 spin_unlock(&oscc->oscc_lock);
3847 /* free memory of osc quota cache */
3848 lquota_cleanup(quota_interface, obd);
3850 rc = client_obd_cleanup(obd);
3856 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3858 struct lustre_cfg *lcfg = buf;
3859 struct lprocfs_static_vars lvars = { 0 };
3862 lprocfs_osc_init_vars(&lvars);
3864 switch (lcfg->lcfg_command) {
3865 case LCFG_SPTLRPC_CONF:
3866 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3869 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3877 struct obd_ops osc_obd_ops = {
3878 .o_owner = THIS_MODULE,
3879 .o_setup = osc_setup,
3880 .o_precleanup = osc_precleanup,
3881 .o_cleanup = osc_cleanup,
3882 .o_add_conn = client_import_add_conn,
3883 .o_del_conn = client_import_del_conn,
3884 .o_connect = client_connect_import,
3885 .o_reconnect = osc_reconnect,
3886 .o_disconnect = osc_disconnect,
3887 .o_statfs = osc_statfs,
3888 .o_statfs_async = osc_statfs_async,
3889 .o_packmd = osc_packmd,
3890 .o_unpackmd = osc_unpackmd,
3891 .o_precreate = osc_precreate,
3892 .o_create = osc_create,
3893 .o_destroy = osc_destroy,
3894 .o_getattr = osc_getattr,
3895 .o_getattr_async = osc_getattr_async,
3896 .o_setattr = osc_setattr,
3897 .o_setattr_async = osc_setattr_async,
3899 .o_brw_async = osc_brw_async,
3900 .o_prep_async_page = osc_prep_async_page,
3901 .o_queue_async_io = osc_queue_async_io,
3902 .o_set_async_flags = osc_set_async_flags,
3903 .o_queue_group_io = osc_queue_group_io,
3904 .o_trigger_group_io = osc_trigger_group_io,
3905 .o_teardown_async_page = osc_teardown_async_page,
3906 .o_punch = osc_punch,
3908 .o_enqueue = osc_enqueue,
3909 .o_match = osc_match,
3910 .o_change_cbdata = osc_change_cbdata,
3911 .o_cancel = osc_cancel,
3912 .o_cancel_unused = osc_cancel_unused,
3913 .o_join_lru = osc_join_lru,
3914 .o_iocontrol = osc_iocontrol,
3915 .o_get_info = osc_get_info,
3916 .o_set_info_async = osc_set_info_async,
3917 .o_import_event = osc_import_event,
3918 .o_llog_init = osc_llog_init,
3919 .o_llog_finish = osc_llog_finish,
3920 .o_process_config = osc_process_config,
3922 int __init osc_init(void)
3924 struct lprocfs_static_vars lvars = { 0 };
3928 lprocfs_osc_init_vars(&lvars);
3930 request_module("lquota");
3931 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3932 lquota_init(quota_interface);
3933 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3935 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3936 LUSTRE_OSC_NAME, NULL);
3938 if (quota_interface)
3939 PORTAL_SYMBOL_PUT(osc_quota_interface);
3947 static void /*__exit*/ osc_exit(void)
3949 lquota_exit(quota_interface);
3950 if (quota_interface)
3951 PORTAL_SYMBOL_PUT(osc_quota_interface);
3953 class_unregister_type(LUSTRE_OSC_NAME);
3956 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3957 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3958 MODULE_LICENSE("GPL");
3960 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);