1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
37 #include <libcfs/libcfs.h>
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <obd_cksum.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include <lustre_cache.h>
60 #include "osc_internal.h"
62 static quota_interface_t *quota_interface = NULL;
63 extern quota_interface_t osc_quota_interface;
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 LASSERT(lsm->lsm_object_gr);
95 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
110 if (lmm_bytes < sizeof (*lmm)) {
111 CERROR("lov_mds_md too small: %d, need %d\n",
112 lmm_bytes, (int)sizeof(*lmm));
115 /* XXX LOV_MAGIC etc check? */
117 if (lmm->lmm_object_id == 0) {
118 CERROR("lov_mds_md: zero lmm_object_id\n");
123 lsm_size = lov_stripe_md_size(1);
127 if (*lsmp != NULL && lmm == NULL) {
128 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129 OBD_FREE(*lsmp, lsm_size);
135 OBD_ALLOC(*lsmp, lsm_size);
138 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140 OBD_FREE(*lsmp, lsm_size);
143 loi_init((*lsmp)->lsm_oinfo[0]);
147 /* XXX zero *lsmp? */
148 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150 LASSERT((*lsmp)->lsm_object_id);
151 LASSERT((*lsmp)->lsm_object_gr);
154 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159 static inline void osc_pack_capa(struct ptlrpc_request *req,
160 struct ost_body *body, void *capa)
162 struct obd_capa *oc = (struct obd_capa *)capa;
163 struct lustre_capa *c;
168 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
171 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172 DEBUG_CAPA(D_SEC, c, "pack");
175 static inline void osc_pack_req_body(struct ptlrpc_request *req,
176 struct obd_info *oinfo)
178 struct ost_body *body;
180 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
183 body->oa = *oinfo->oi_oa;
184 osc_pack_capa(req, body, oinfo->oi_capa);
187 static inline void osc_set_capa_size(struct ptlrpc_request *req,
188 const struct req_msg_field *field,
192 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
194 /* it is already calculated as sizeof struct obd_capa */
198 static int osc_getattr_interpret(struct ptlrpc_request *req,
199 struct osc_async_args *aa, int rc)
201 struct ost_body *body;
207 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
208 lustre_swab_ost_body);
210 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
211 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
213 /* This should really be sent by the OST */
214 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
215 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
217 CDEBUG(D_INFO, "can't unpack ost_body\n");
219 aa->aa_oi->oi_oa->o_valid = 0;
222 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
226 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
227 struct ptlrpc_request_set *set)
229 struct ptlrpc_request *req;
230 struct osc_async_args *aa;
234 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
238 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
239 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
241 ptlrpc_request_free(req);
245 osc_pack_req_body(req, oinfo);
247 ptlrpc_request_set_replen(req);
248 req->rq_interpret_reply = osc_getattr_interpret;
250 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
251 aa = ptlrpc_req_async_args(req);
254 ptlrpc_set_add_req(set, req);
258 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
260 struct ptlrpc_request *req;
261 struct ost_body *body;
265 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
269 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
270 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
272 ptlrpc_request_free(req);
276 osc_pack_req_body(req, oinfo);
278 ptlrpc_request_set_replen(req);
280 rc = ptlrpc_queue_wait(req);
284 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
286 GOTO(out, rc = -EPROTO);
288 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
289 *oinfo->oi_oa = body->oa;
291 /* This should really be sent by the OST */
292 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
293 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
297 ptlrpc_req_finished(req);
301 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
302 struct obd_trans_info *oti)
304 struct ptlrpc_request *req;
305 struct ost_body *body;
309 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
310 oinfo->oi_oa->o_gr > 0);
312 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
316 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
317 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
319 ptlrpc_request_free(req);
323 osc_pack_req_body(req, oinfo);
325 ptlrpc_request_set_replen(req);
328 rc = ptlrpc_queue_wait(req);
332 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
334 GOTO(out, rc = -EPROTO);
336 *oinfo->oi_oa = body->oa;
340 ptlrpc_req_finished(req);
344 static int osc_setattr_interpret(struct ptlrpc_request *req,
345 struct osc_async_args *aa, int rc)
347 struct ost_body *body;
353 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
355 GOTO(out, rc = -EPROTO);
357 *aa->aa_oi->oi_oa = body->oa;
359 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
363 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
364 struct obd_trans_info *oti,
365 struct ptlrpc_request_set *rqset)
367 struct ptlrpc_request *req;
368 struct osc_async_args *aa;
372 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
376 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
377 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
379 ptlrpc_request_free(req);
383 osc_pack_req_body(req, oinfo);
385 ptlrpc_request_set_replen(req);
387 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
389 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
392 /* do mds to ost setattr asynchronouly */
394 /* Do not wait for response. */
395 ptlrpcd_add_req(req);
397 req->rq_interpret_reply = osc_setattr_interpret;
399 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
400 aa = ptlrpc_req_async_args(req);
403 ptlrpc_set_add_req(rqset, req);
409 int osc_real_create(struct obd_export *exp, struct obdo *oa,
410 struct lov_stripe_md **ea, struct obd_trans_info *oti)
412 struct ptlrpc_request *req;
413 struct ost_body *body;
414 struct lov_stripe_md *lsm;
423 rc = obd_alloc_memmd(exp, &lsm);
428 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
430 GOTO(out, rc = -ENOMEM);
432 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
434 ptlrpc_request_free(req);
438 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
442 ptlrpc_request_set_replen(req);
444 if (oa->o_valid & OBD_MD_FLINLINE) {
445 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
446 oa->o_flags == OBD_FL_DELORPHAN);
448 "delorphan from OST integration");
449 /* Don't resend the delorphan req */
450 req->rq_no_resend = req->rq_no_delay = 1;
453 rc = ptlrpc_queue_wait(req);
457 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
459 GOTO(out_req, rc = -EPROTO);
463 /* This should really be sent by the OST */
464 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
465 oa->o_valid |= OBD_MD_FLBLKSZ;
467 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
468 * have valid lsm_oinfo data structs, so don't go touching that.
469 * This needs to be fixed in a big way.
471 lsm->lsm_object_id = oa->o_id;
472 lsm->lsm_object_gr = oa->o_gr;
476 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
478 if (oa->o_valid & OBD_MD_FLCOOKIE) {
479 if (!oti->oti_logcookies)
480 oti_alloc_cookies(oti, 1);
481 *oti->oti_logcookies = *obdo_logcookie(oa);
485 CDEBUG(D_HA, "transno: "LPD64"\n",
486 lustre_msg_get_transno(req->rq_repmsg));
488 ptlrpc_req_finished(req);
491 obd_free_memmd(exp, &lsm);
495 static int osc_punch_interpret(struct ptlrpc_request *req,
496 struct osc_async_args *aa, int rc)
498 struct ost_body *body;
504 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
506 GOTO(out, rc = -EPROTO);
508 *aa->aa_oi->oi_oa = body->oa;
510 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
514 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
515 struct obd_trans_info *oti,
516 struct ptlrpc_request_set *rqset)
518 struct ptlrpc_request *req;
519 struct osc_async_args *aa;
520 struct ost_body *body;
525 CDEBUG(D_INFO, "oa NULL\n");
529 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
534 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536 ptlrpc_request_free(req);
539 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540 ptlrpc_at_set_req_timeout(req);
541 osc_pack_req_body(req, oinfo);
543 /* overload the size and blocks fields in the oa with start/end */
544 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546 body->oa.o_size = oinfo->oi_policy.l_extent.start;
547 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
548 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
549 ptlrpc_request_set_replen(req);
552 req->rq_interpret_reply = osc_punch_interpret;
553 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
554 aa = ptlrpc_req_async_args(req);
556 ptlrpc_set_add_req(rqset, req);
561 static int osc_sync(struct obd_export *exp, struct obdo *oa,
562 struct lov_stripe_md *md, obd_size start, obd_size end,
565 struct ptlrpc_request *req;
566 struct ost_body *body;
571 CDEBUG(D_INFO, "oa NULL\n");
575 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
579 osc_set_capa_size(req, &RMF_CAPA1, capa);
580 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
582 ptlrpc_request_free(req);
586 /* overload the size and blocks fields in the oa with start/end */
587 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
590 body->oa.o_size = start;
591 body->oa.o_blocks = end;
592 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
593 osc_pack_capa(req, body, capa);
595 ptlrpc_request_set_replen(req);
597 rc = ptlrpc_queue_wait(req);
601 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
603 GOTO(out, rc = -EPROTO);
609 ptlrpc_req_finished(req);
613 /* Find and cancel locally locks matched by @mode in the resource found by
614 * @objid. Found locks are added into @cancel list. Returns the amount of
615 * locks added to @cancels list. */
616 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
617 struct list_head *cancels, ldlm_mode_t mode,
620 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
621 struct ldlm_res_id res_id;
622 struct ldlm_resource *res;
626 osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
627 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
631 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
632 lock_flags, 0, NULL);
633 ldlm_resource_putref(res);
637 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
640 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
642 atomic_dec(&cli->cl_destroy_in_flight);
643 cfs_waitq_signal(&cli->cl_destroy_waitq);
647 static int osc_can_send_destroy(struct client_obd *cli)
649 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
650 cli->cl_max_rpcs_in_flight) {
651 /* The destroy request can be sent */
654 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
655 cli->cl_max_rpcs_in_flight) {
657 * The counter has been modified between the two atomic
660 cfs_waitq_signal(&cli->cl_destroy_waitq);
665 /* Destroy requests can be async always on the client, and we don't even really
666 * care about the return code since the client cannot do anything at all about
668 * When the MDS is unlinking a filename, it saves the file objects into a
669 * recovery llog, and these object records are cancelled when the OST reports
670 * they were destroyed and sync'd to disk (i.e. transaction committed).
671 * If the client dies, or the OST is down when the object should be destroyed,
672 * the records are not cancelled, and when the OST reconnects to the MDS next,
673 * it will retrieve the llog unlink logs and then sends the log cancellation
674 * cookies to the MDS after committing destroy transactions. */
675 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
676 struct lov_stripe_md *ea, struct obd_trans_info *oti,
677 struct obd_export *md_export)
679 struct client_obd *cli = &exp->exp_obd->u.cli;
680 struct ptlrpc_request *req;
681 struct ost_body *body;
682 CFS_LIST_HEAD(cancels);
687 CDEBUG(D_INFO, "oa NULL\n");
691 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
692 LDLM_FL_DISCARD_DATA);
694 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
696 ldlm_lock_list_put(&cancels, l_bl_ast, count);
700 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
703 ptlrpc_request_free(req);
707 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
708 req->rq_interpret_reply = osc_destroy_interpret;
709 ptlrpc_at_set_req_timeout(req);
711 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
712 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
713 sizeof(*oti->oti_logcookies));
714 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
718 ptlrpc_request_set_replen(req);
720 if (!osc_can_send_destroy(cli)) {
721 struct l_wait_info lwi = { 0 };
724 * Wait until the number of on-going destroy RPCs drops
725 * under max_rpc_in_flight
727 l_wait_event_exclusive(cli->cl_destroy_waitq,
728 osc_can_send_destroy(cli), &lwi);
731 /* Do not wait for response */
732 ptlrpcd_add_req(req);
736 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
739 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
741 LASSERT(!(oa->o_valid & bits));
744 client_obd_list_lock(&cli->cl_loi_list_lock);
745 oa->o_dirty = cli->cl_dirty;
746 if (cli->cl_dirty > cli->cl_dirty_max) {
747 CERROR("dirty %lu > dirty_max %lu\n",
748 cli->cl_dirty, cli->cl_dirty_max);
750 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
751 CERROR("dirty %d > system dirty_max %d\n",
752 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
754 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
755 CERROR("dirty %lu - dirty_max %lu too big???\n",
756 cli->cl_dirty, cli->cl_dirty_max);
759 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
760 (cli->cl_max_rpcs_in_flight + 1);
761 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
763 oa->o_grant = cli->cl_avail_grant;
764 oa->o_dropped = cli->cl_lost_grant;
765 cli->cl_lost_grant = 0;
766 client_obd_list_unlock(&cli->cl_loi_list_lock);
767 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
768 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
771 /* caller must hold loi_list_lock */
772 static void osc_consume_write_grant(struct client_obd *cli,
773 struct brw_page *pga)
775 atomic_inc(&obd_dirty_pages);
776 cli->cl_dirty += CFS_PAGE_SIZE;
777 cli->cl_avail_grant -= CFS_PAGE_SIZE;
778 pga->flag |= OBD_BRW_FROM_GRANT;
779 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
780 CFS_PAGE_SIZE, pga, pga->pg);
781 LASSERT(cli->cl_avail_grant >= 0);
784 /* the companion to osc_consume_write_grant, called when a brw has completed.
785 * must be called with the loi lock held. */
786 static void osc_release_write_grant(struct client_obd *cli,
787 struct brw_page *pga, int sent)
789 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
792 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
797 pga->flag &= ~OBD_BRW_FROM_GRANT;
798 atomic_dec(&obd_dirty_pages);
799 cli->cl_dirty -= CFS_PAGE_SIZE;
801 cli->cl_lost_grant += CFS_PAGE_SIZE;
802 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
803 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
804 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
805 /* For short writes we shouldn't count parts of pages that
806 * span a whole block on the OST side, or our accounting goes
807 * wrong. Should match the code in filter_grant_check. */
808 int offset = pga->off & ~CFS_PAGE_MASK;
809 int count = pga->count + (offset & (blocksize - 1));
810 int end = (offset + pga->count) & (blocksize - 1);
812 count += blocksize - end;
814 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
815 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
816 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
817 cli->cl_avail_grant, cli->cl_dirty);
823 static unsigned long rpcs_in_flight(struct client_obd *cli)
825 return cli->cl_r_in_flight + cli->cl_w_in_flight;
828 /* caller must hold loi_list_lock */
829 void osc_wake_cache_waiters(struct client_obd *cli)
831 struct list_head *l, *tmp;
832 struct osc_cache_waiter *ocw;
835 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
836 /* if we can't dirty more, we must wait until some is written */
837 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
838 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
839 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
840 "osc max %ld, sys max %d\n", cli->cl_dirty,
841 cli->cl_dirty_max, obd_max_dirty_pages);
845 /* if still dirty cache but no grant wait for pending RPCs that
846 * may yet return us some grant before doing sync writes */
847 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
848 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
849 cli->cl_w_in_flight);
853 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
854 list_del_init(&ocw->ocw_entry);
855 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
856 /* no more RPCs in flight to return grant, do sync IO */
857 ocw->ocw_rc = -EDQUOT;
858 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
860 osc_consume_write_grant(cli,
861 &ocw->ocw_oap->oap_brw_page);
864 cfs_waitq_signal(&ocw->ocw_waitq);
870 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
872 client_obd_list_lock(&cli->cl_loi_list_lock);
873 cli->cl_avail_grant = ocd->ocd_grant;
874 client_obd_list_unlock(&cli->cl_loi_list_lock);
876 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
877 cli->cl_avail_grant, cli->cl_lost_grant);
878 LASSERT(cli->cl_avail_grant >= 0);
881 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
883 client_obd_list_lock(&cli->cl_loi_list_lock);
884 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
885 if (body->oa.o_valid & OBD_MD_FLGRANT)
886 cli->cl_avail_grant += body->oa.o_grant;
887 /* waiters are woken in brw_interpret */
888 client_obd_list_unlock(&cli->cl_loi_list_lock);
891 /* We assume that the reason this OSC got a short read is because it read
892 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
893 * via the LOV, and it _knows_ it's reading inside the file, it's just that
894 * this stripe never got written at or beyond this stripe offset yet. */
895 static void handle_short_read(int nob_read, obd_count page_count,
896 struct brw_page **pga)
901 /* skip bytes read OK */
902 while (nob_read > 0) {
903 LASSERT (page_count > 0);
905 if (pga[i]->count > nob_read) {
906 /* EOF inside this page */
907 ptr = cfs_kmap(pga[i]->pg) +
908 (pga[i]->off & ~CFS_PAGE_MASK);
909 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
910 cfs_kunmap(pga[i]->pg);
916 nob_read -= pga[i]->count;
921 /* zero remaining pages */
922 while (page_count-- > 0) {
923 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
924 memset(ptr, 0, pga[i]->count);
925 cfs_kunmap(pga[i]->pg);
930 static int check_write_rcs(struct ptlrpc_request *req,
931 int requested_nob, int niocount,
932 obd_count page_count, struct brw_page **pga)
936 /* return error if any niobuf was in error */
937 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
938 sizeof(*remote_rcs) * niocount, NULL);
939 if (remote_rcs == NULL) {
940 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
943 if (lustre_msg_swabbed(req->rq_repmsg))
944 for (i = 0; i < niocount; i++)
945 __swab32s(&remote_rcs[i]);
947 for (i = 0; i < niocount; i++) {
948 if (remote_rcs[i] < 0)
949 return(remote_rcs[i]);
951 if (remote_rcs[i] != 0) {
952 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
953 i, remote_rcs[i], req);
958 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
959 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
960 requested_nob, req->rq_bulk->bd_nob_transferred);
967 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
969 if (p1->flag != p2->flag) {
970 unsigned mask = ~OBD_BRW_FROM_GRANT;
972 /* warn if we try to combine flags that we don't know to be
974 if ((p1->flag & mask) != (p2->flag & mask))
975 CERROR("is it ok to have flags 0x%x and 0x%x in the "
976 "same brw?\n", p1->flag, p2->flag);
980 return (p1->off + p1->count == p2->off);
983 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
984 struct brw_page **pga, int opc,
985 cksum_type_t cksum_type)
990 LASSERT (pg_count > 0);
991 cksum = init_checksum(cksum_type);
992 while (nob > 0 && pg_count > 0) {
993 unsigned char *ptr = cfs_kmap(pga[i]->pg);
994 int off = pga[i]->off & ~CFS_PAGE_MASK;
995 int count = pga[i]->count > nob ? nob : pga[i]->count;
997 /* corrupt the data before we compute the checksum, to
998 * simulate an OST->client data error */
999 if (i == 0 && opc == OST_READ &&
1000 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1001 memcpy(ptr + off, "bad1", min(4, nob));
1002 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1003 cfs_kunmap(pga[i]->pg);
1004 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1007 nob -= pga[i]->count;
1011 /* For sending we only compute the wrong checksum instead
1012 * of corrupting the data so it is still correct on a redo */
1013 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1019 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1020 struct lov_stripe_md *lsm, obd_count page_count,
1021 struct brw_page **pga,
1022 struct ptlrpc_request **reqp,
1023 struct obd_capa *ocapa)
1025 struct ptlrpc_request *req;
1026 struct ptlrpc_bulk_desc *desc;
1027 struct ost_body *body;
1028 struct obd_ioobj *ioobj;
1029 struct niobuf_remote *niobuf;
1030 int niocount, i, requested_nob, opc, rc;
1031 struct osc_brw_async_args *aa;
1032 struct req_capsule *pill;
1033 struct brw_page *pg_prev;
1036 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1037 RETURN(-ENOMEM); /* Recoverable */
1038 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1039 RETURN(-EINVAL); /* Fatal */
1041 if ((cmd & OBD_BRW_WRITE) != 0) {
1043 req = ptlrpc_request_alloc_pool(cli->cl_import,
1044 cli->cl_import->imp_rq_pool,
1048 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1054 for (niocount = i = 1; i < page_count; i++) {
1055 if (!can_merge_pages(pga[i - 1], pga[i]))
1059 pill = &req->rq_pill;
1060 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1061 niocount * sizeof(*niobuf));
1062 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1064 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1066 ptlrpc_request_free(req);
1069 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1070 ptlrpc_at_set_req_timeout(req);
1072 if (opc == OST_WRITE)
1073 desc = ptlrpc_prep_bulk_imp(req, page_count,
1074 BULK_GET_SOURCE, OST_BULK_PORTAL);
1076 desc = ptlrpc_prep_bulk_imp(req, page_count,
1077 BULK_PUT_SINK, OST_BULK_PORTAL);
1080 GOTO(out, rc = -ENOMEM);
1081 /* NB request now owns desc and will free it when it gets freed */
1083 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1084 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1085 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1086 LASSERT(body && ioobj && niobuf);
1090 obdo_to_ioobj(oa, ioobj);
1091 ioobj->ioo_bufcnt = niocount;
1092 osc_pack_capa(req, body, ocapa);
1093 LASSERT (page_count > 0);
1095 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1096 struct brw_page *pg = pga[i];
1098 LASSERT(pg->count > 0);
1099 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1100 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1101 pg->off, pg->count);
1103 LASSERTF(i == 0 || pg->off > pg_prev->off,
1104 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1105 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1107 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1108 pg_prev->pg, page_private(pg_prev->pg),
1109 pg_prev->pg->index, pg_prev->off);
1111 LASSERTF(i == 0 || pg->off > pg_prev->off,
1112 "i %d p_c %u\n", i, page_count);
1114 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1115 (pg->flag & OBD_BRW_SRVLOCK));
1117 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1119 requested_nob += pg->count;
1121 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1123 niobuf->len += pg->count;
1125 niobuf->offset = pg->off;
1126 niobuf->len = pg->count;
1127 niobuf->flags = pg->flag;
1132 LASSERTF((void *)(niobuf - niocount) ==
1133 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1134 niocount * sizeof(*niobuf)),
1135 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1136 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1137 (void *)(niobuf - niocount));
1139 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1141 /* size[REQ_REC_OFF] still sizeof (*body) */
1142 if (opc == OST_WRITE) {
1143 if (unlikely(cli->cl_checksum) &&
1144 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1145 /* store cl_cksum_type in a local variable since
1146 * it can be changed via lprocfs */
1147 cksum_type_t cksum_type = cli->cl_cksum_type;
1149 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1150 oa->o_flags = body->oa.o_flags = 0;
1151 body->oa.o_flags |= cksum_type_pack(cksum_type);
1152 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1153 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1157 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1159 /* save this in 'oa', too, for later checking */
1160 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1161 oa->o_flags |= cksum_type_pack(cksum_type);
1163 /* clear out the checksum flag, in case this is a
1164 * resend but cl_checksum is no longer set. b=11238 */
1165 oa->o_valid &= ~OBD_MD_FLCKSUM;
1167 oa->o_cksum = body->oa.o_cksum;
1168 /* 1 RC per niobuf */
1169 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1170 sizeof(__u32) * niocount);
1172 if (unlikely(cli->cl_checksum) &&
1173 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1174 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1175 body->oa.o_flags = 0;
1176 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1177 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1179 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1180 /* 1 RC for the whole I/O */
1182 ptlrpc_request_set_replen(req);
1184 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1185 aa = ptlrpc_req_async_args(req);
1187 aa->aa_requested_nob = requested_nob;
1188 aa->aa_nio_count = niocount;
1189 aa->aa_page_count = page_count;
1193 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1199 ptlrpc_req_finished(req);
1203 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1204 __u32 client_cksum, __u32 server_cksum, int nob,
1205 obd_count page_count, struct brw_page **pga,
1206 cksum_type_t client_cksum_type)
1210 cksum_type_t cksum_type;
1212 if (server_cksum == client_cksum) {
1213 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1217 if (oa->o_valid & OBD_MD_FLFLAGS)
1218 cksum_type = cksum_type_unpack(oa->o_flags);
1220 cksum_type = OBD_CKSUM_CRC32;
1222 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1225 if (cksum_type != client_cksum_type)
1226 msg = "the server did not use the checksum type specified in "
1227 "the original request - likely a protocol problem";
1228 else if (new_cksum == server_cksum)
1229 msg = "changed on the client after we checksummed it - "
1230 "likely false positive due to mmap IO (bug 11742)";
1231 else if (new_cksum == client_cksum)
1232 msg = "changed in transit before arrival at OST";
1234 msg = "changed in transit AND doesn't match the original - "
1235 "likely false positive due to mmap IO (bug 11742)";
1237 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1238 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1239 "["LPU64"-"LPU64"]\n",
1240 msg, libcfs_nid2str(peer->nid),
1241 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1242 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1245 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1247 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1248 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1249 "client csum now %x\n", client_cksum, client_cksum_type,
1250 server_cksum, cksum_type, new_cksum);
1254 /* Note rc enters this function as number of bytes transferred */
1255 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1257 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1258 const lnet_process_id_t *peer =
1259 &req->rq_import->imp_connection->c_peer;
1260 struct client_obd *cli = aa->aa_cli;
1261 struct ost_body *body;
1262 __u32 client_cksum = 0;
1265 if (rc < 0 && rc != -EDQUOT)
1268 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1269 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1270 lustre_swab_ost_body);
1272 CDEBUG(D_INFO, "Can't unpack body\n");
1276 /* set/clear over quota flag for a uid/gid */
1277 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1278 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1279 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1280 body->oa.o_gid, body->oa.o_valid,
1286 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1287 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1289 osc_update_grant(cli, body);
1291 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1293 CERROR("Unexpected +ve rc %d\n", rc);
1296 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1298 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1299 check_write_checksum(&body->oa, peer, client_cksum,
1300 body->oa.o_cksum, aa->aa_requested_nob,
1301 aa->aa_page_count, aa->aa_ppga,
1302 cksum_type_unpack(aa->aa_oa->o_flags)))
1305 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1308 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1309 aa->aa_page_count, aa->aa_ppga);
1313 /* The rest of this function executes only for OST_READs */
1314 if (rc > aa->aa_requested_nob) {
1315 CERROR("Unexpected rc %d (%d requested)\n", rc,
1316 aa->aa_requested_nob);
1320 if (rc != req->rq_bulk->bd_nob_transferred) {
1321 CERROR ("Unexpected rc %d (%d transferred)\n",
1322 rc, req->rq_bulk->bd_nob_transferred);
1326 if (rc < aa->aa_requested_nob)
1327 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1329 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1331 GOTO(out, rc = -EAGAIN);
1333 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1334 static int cksum_counter;
1335 __u32 server_cksum = body->oa.o_cksum;
1338 cksum_type_t cksum_type;
1340 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1341 cksum_type = cksum_type_unpack(body->oa.o_flags);
1343 cksum_type = OBD_CKSUM_CRC32;
1344 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1345 aa->aa_ppga, OST_READ,
1348 if (peer->nid == req->rq_bulk->bd_sender) {
1352 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1355 if (server_cksum == ~0 && rc > 0) {
1356 CERROR("Protocol error: server %s set the 'checksum' "
1357 "bit, but didn't send a checksum. Not fatal, "
1358 "but please tell CFS.\n",
1359 libcfs_nid2str(peer->nid));
1360 } else if (server_cksum != client_cksum) {
1361 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1362 "%s%s%s inum "LPU64"/"LPU64" object "
1363 LPU64"/"LPU64" extent "
1364 "["LPU64"-"LPU64"]\n",
1365 req->rq_import->imp_obd->obd_name,
1366 libcfs_nid2str(peer->nid),
1368 body->oa.o_valid & OBD_MD_FLFID ?
1369 body->oa.o_fid : (__u64)0,
1370 body->oa.o_valid & OBD_MD_FLFID ?
1371 body->oa.o_generation :(__u64)0,
1373 body->oa.o_valid & OBD_MD_FLGROUP ?
1374 body->oa.o_gr : (__u64)0,
1375 aa->aa_ppga[0]->off,
1376 aa->aa_ppga[aa->aa_page_count-1]->off +
1377 aa->aa_ppga[aa->aa_page_count-1]->count -
1379 CERROR("client %x, server %x, cksum_type %x\n",
1380 client_cksum, server_cksum, cksum_type);
1382 aa->aa_oa->o_cksum = client_cksum;
1386 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1389 } else if (unlikely(client_cksum)) {
1390 static int cksum_missed;
1393 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1394 CERROR("Checksum %u requested from %s but not sent\n",
1395 cksum_missed, libcfs_nid2str(peer->nid));
1401 *aa->aa_oa = body->oa;
1406 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1407 struct lov_stripe_md *lsm,
1408 obd_count page_count, struct brw_page **pga,
1409 struct obd_capa *ocapa)
1411 struct ptlrpc_request *req;
1415 struct l_wait_info lwi;
1419 cfs_waitq_init(&waitq);
1422 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1423 page_count, pga, &req, ocapa);
1427 rc = ptlrpc_queue_wait(req);
1429 if (rc == -ETIMEDOUT && req->rq_resend) {
1430 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1431 ptlrpc_req_finished(req);
1435 rc = osc_brw_fini_request(req, rc);
1437 ptlrpc_req_finished(req);
1438 if (osc_recoverable_error(rc)) {
1440 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1441 CERROR("too many resend retries, returning error\n");
1445 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1446 l_wait_event(waitq, 0, &lwi);
1454 int osc_brw_redo_request(struct ptlrpc_request *request,
1455 struct osc_brw_async_args *aa)
1457 struct ptlrpc_request *new_req;
1458 struct ptlrpc_request_set *set = request->rq_set;
1459 struct osc_brw_async_args *new_aa;
1460 struct osc_async_page *oap;
1464 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1465 CERROR("too many resend retries, returning error\n");
1469 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1471 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1472 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1473 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1476 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1477 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1478 aa->aa_cli, aa->aa_oa,
1479 NULL /* lsm unused by osc currently */,
1480 aa->aa_page_count, aa->aa_ppga,
1481 &new_req, NULL /* ocapa */);
1485 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1487 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1488 if (oap->oap_request != NULL) {
1489 LASSERTF(request == oap->oap_request,
1490 "request %p != oap_request %p\n",
1491 request, oap->oap_request);
1492 if (oap->oap_interrupted) {
1493 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1494 ptlrpc_req_finished(new_req);
1499 /* New request takes over pga and oaps from old request.
1500 * Note that copying a list_head doesn't work, need to move it... */
1502 new_req->rq_interpret_reply = request->rq_interpret_reply;
1503 new_req->rq_async_args = request->rq_async_args;
1504 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1506 new_aa = ptlrpc_req_async_args(new_req);
1508 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1509 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1510 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1512 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1513 if (oap->oap_request) {
1514 ptlrpc_req_finished(oap->oap_request);
1515 oap->oap_request = ptlrpc_request_addref(new_req);
1519 /* use ptlrpc_set_add_req is safe because interpret functions work
1520 * in check_set context. only one way exist with access to request
1521 * from different thread got -EINTR - this way protected with
1522 * cl_loi_list_lock */
1523 ptlrpc_set_add_req(set, new_req);
1525 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1527 DEBUG_REQ(D_INFO, new_req, "new request");
1531 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1532 struct lov_stripe_md *lsm, obd_count page_count,
1533 struct brw_page **pga, struct ptlrpc_request_set *set,
1534 struct obd_capa *ocapa)
1536 struct ptlrpc_request *req;
1537 struct client_obd *cli = &exp->exp_obd->u.cli;
1539 struct osc_brw_async_args *aa;
1542 /* Consume write credits even if doing a sync write -
1543 * otherwise we may run out of space on OST due to grant. */
1544 if (cmd == OBD_BRW_WRITE) {
1545 spin_lock(&cli->cl_loi_list_lock);
1546 for (i = 0; i < page_count; i++) {
1547 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1548 osc_consume_write_grant(cli, pga[i]);
1550 spin_unlock(&cli->cl_loi_list_lock);
1553 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1556 aa = ptlrpc_req_async_args(req);
1557 if (cmd == OBD_BRW_READ) {
1558 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1559 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1560 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1562 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1563 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1564 cli->cl_w_in_flight);
1565 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1568 LASSERT(list_empty(&aa->aa_oaps));
1570 req->rq_interpret_reply = brw_interpret;
1571 ptlrpc_set_add_req(set, req);
1572 client_obd_list_lock(&cli->cl_loi_list_lock);
1573 if (cmd == OBD_BRW_READ)
1574 cli->cl_r_in_flight++;
1576 cli->cl_w_in_flight++;
1577 client_obd_list_unlock(&cli->cl_loi_list_lock);
1578 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1579 } else if (cmd == OBD_BRW_WRITE) {
1580 client_obd_list_lock(&cli->cl_loi_list_lock);
1581 for (i = 0; i < page_count; i++)
1582 osc_release_write_grant(cli, pga[i], 0);
1583 osc_wake_cache_waiters(cli);
1584 client_obd_list_unlock(&cli->cl_loi_list_lock);
1590 * ugh, we want disk allocation on the target to happen in offset order. we'll
1591 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1592 * fine for our small page arrays and doesn't require allocation. its an
1593 * insertion sort that swaps elements that are strides apart, shrinking the
1594 * stride down until its '1' and the array is sorted.
1596 static void sort_brw_pages(struct brw_page **array, int num)
1599 struct brw_page *tmp;
1603 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1608 for (i = stride ; i < num ; i++) {
1611 while (j >= stride && array[j - stride]->off > tmp->off) {
1612 array[j] = array[j - stride];
1617 } while (stride > 1);
1620 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1626 LASSERT (pages > 0);
1627 offset = pg[i]->off & ~CFS_PAGE_MASK;
1631 if (pages == 0) /* that's all */
1634 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1635 return count; /* doesn't end on page boundary */
1638 offset = pg[i]->off & ~CFS_PAGE_MASK;
1639 if (offset != 0) /* doesn't start on page boundary */
1646 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1648 struct brw_page **ppga;
1651 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1655 for (i = 0; i < count; i++)
1660 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1662 LASSERT(ppga != NULL);
1663 OBD_FREE(ppga, sizeof(*ppga) * count);
1666 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1667 obd_count page_count, struct brw_page *pga,
1668 struct obd_trans_info *oti)
1670 struct obdo *saved_oa = NULL;
1671 struct brw_page **ppga, **orig;
1672 struct obd_import *imp = class_exp2cliimp(exp);
1673 struct client_obd *cli = &imp->imp_obd->u.cli;
1674 int rc, page_count_orig;
1677 if (cmd & OBD_BRW_CHECK) {
1678 /* The caller just wants to know if there's a chance that this
1679 * I/O can succeed */
1681 if (imp == NULL || imp->imp_invalid)
1686 /* test_brw with a failed create can trip this, maybe others. */
1687 LASSERT(cli->cl_max_pages_per_rpc);
1691 orig = ppga = osc_build_ppga(pga, page_count);
1694 page_count_orig = page_count;
1696 sort_brw_pages(ppga, page_count);
1697 while (page_count) {
1698 obd_count pages_per_brw;
1700 if (page_count > cli->cl_max_pages_per_rpc)
1701 pages_per_brw = cli->cl_max_pages_per_rpc;
1703 pages_per_brw = page_count;
1705 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1707 if (saved_oa != NULL) {
1708 /* restore previously saved oa */
1709 *oinfo->oi_oa = *saved_oa;
1710 } else if (page_count > pages_per_brw) {
1711 /* save a copy of oa (brw will clobber it) */
1712 OBDO_ALLOC(saved_oa);
1713 if (saved_oa == NULL)
1714 GOTO(out, rc = -ENOMEM);
1715 *saved_oa = *oinfo->oi_oa;
1718 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1719 pages_per_brw, ppga, oinfo->oi_capa);
1724 page_count -= pages_per_brw;
1725 ppga += pages_per_brw;
1729 osc_release_ppga(orig, page_count_orig);
1731 if (saved_oa != NULL)
1732 OBDO_FREE(saved_oa);
1737 static int osc_brw_async(int cmd, struct obd_export *exp,
1738 struct obd_info *oinfo, obd_count page_count,
1739 struct brw_page *pga, struct obd_trans_info *oti,
1740 struct ptlrpc_request_set *set)
1742 struct brw_page **ppga, **orig;
1743 struct client_obd *cli = &exp->exp_obd->u.cli;
1744 int page_count_orig;
1748 if (cmd & OBD_BRW_CHECK) {
1749 struct obd_import *imp = class_exp2cliimp(exp);
1750 /* The caller just wants to know if there's a chance that this
1751 * I/O can succeed */
1753 if (imp == NULL || imp->imp_invalid)
1758 orig = ppga = osc_build_ppga(pga, page_count);
1761 page_count_orig = page_count;
1763 sort_brw_pages(ppga, page_count);
1764 while (page_count) {
1765 struct brw_page **copy;
1766 obd_count pages_per_brw;
1768 pages_per_brw = min_t(obd_count, page_count,
1769 cli->cl_max_pages_per_rpc);
1771 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1773 /* use ppga only if single RPC is going to fly */
1774 if (pages_per_brw != page_count_orig || ppga != orig) {
1775 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1777 GOTO(out, rc = -ENOMEM);
1778 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1782 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1783 pages_per_brw, copy, set, oinfo->oi_capa);
1787 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1791 /* we passed it to async_internal() which is
1792 * now responsible for releasing memory */
1796 page_count -= pages_per_brw;
1797 ppga += pages_per_brw;
1801 osc_release_ppga(orig, page_count_orig);
1805 static void osc_check_rpcs(struct client_obd *cli);
1807 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1808 * the dirty accounting. Writeback completes or truncate happens before
1809 * writing starts. Must be called with the loi lock held. */
1810 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1813 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1817 /* This maintains the lists of pending pages to read/write for a given object
1818 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1819 * to quickly find objects that are ready to send an RPC. */
1820 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1826 if (lop->lop_num_pending == 0)
1829 /* if we have an invalid import we want to drain the queued pages
1830 * by forcing them through rpcs that immediately fail and complete
1831 * the pages. recovery relies on this to empty the queued pages
1832 * before canceling the locks and evicting down the llite pages */
1833 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1836 /* stream rpcs in queue order as long as as there is an urgent page
1837 * queued. this is our cheap solution for good batching in the case
1838 * where writepage marks some random page in the middle of the file
1839 * as urgent because of, say, memory pressure */
1840 if (!list_empty(&lop->lop_urgent)) {
1841 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1844 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1845 optimal = cli->cl_max_pages_per_rpc;
1846 if (cmd & OBD_BRW_WRITE) {
1847 /* trigger a write rpc stream as long as there are dirtiers
1848 * waiting for space. as they're waiting, they're not going to
1849 * create more pages to coallesce with what's waiting.. */
1850 if (!list_empty(&cli->cl_cache_waiters)) {
1851 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1854 /* +16 to avoid triggering rpcs that would want to include pages
1855 * that are being queued but which can't be made ready until
1856 * the queuer finishes with the page. this is a wart for
1857 * llite::commit_write() */
1860 if (lop->lop_num_pending >= optimal)
1866 static void on_list(struct list_head *item, struct list_head *list,
1869 if (list_empty(item) && should_be_on)
1870 list_add_tail(item, list);
1871 else if (!list_empty(item) && !should_be_on)
1872 list_del_init(item);
1875 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1876 * can find pages to build into rpcs quickly */
1877 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1879 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1880 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1881 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1883 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1884 loi->loi_write_lop.lop_num_pending);
1886 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1887 loi->loi_read_lop.lop_num_pending);
1890 static void lop_update_pending(struct client_obd *cli,
1891 struct loi_oap_pages *lop, int cmd, int delta)
1893 lop->lop_num_pending += delta;
1894 if (cmd & OBD_BRW_WRITE)
1895 cli->cl_pending_w_pages += delta;
1897 cli->cl_pending_r_pages += delta;
1900 /* this is called when a sync waiter receives an interruption. Its job is to
1901 * get the caller woken as soon as possible. If its page hasn't been put in an
1902 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1903 * desiring interruption which will forcefully complete the rpc once the rpc
1905 static void osc_occ_interrupted(struct oig_callback_context *occ)
1907 struct osc_async_page *oap;
1908 struct loi_oap_pages *lop;
1909 struct lov_oinfo *loi;
1912 /* XXX member_of() */
1913 oap = list_entry(occ, struct osc_async_page, oap_occ);
1915 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1917 oap->oap_interrupted = 1;
1919 /* ok, it's been put in an rpc. only one oap gets a request reference */
1920 if (oap->oap_request != NULL) {
1921 ptlrpc_mark_interrupted(oap->oap_request);
1922 ptlrpcd_wake(oap->oap_request);
1926 /* we don't get interruption callbacks until osc_trigger_group_io()
1927 * has been called and put the sync oaps in the pending/urgent lists.*/
1928 if (!list_empty(&oap->oap_pending_item)) {
1929 list_del_init(&oap->oap_pending_item);
1930 list_del_init(&oap->oap_urgent_item);
1933 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1934 &loi->loi_write_lop : &loi->loi_read_lop;
1935 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1936 loi_list_maint(oap->oap_cli, oap->oap_loi);
1938 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1939 oap->oap_oig = NULL;
1943 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1946 /* this is trying to propogate async writeback errors back up to the
1947 * application. As an async write fails we record the error code for later if
1948 * the app does an fsync. As long as errors persist we force future rpcs to be
1949 * sync so that the app can get a sync error and break the cycle of queueing
1950 * pages for which writeback will fail. */
1951 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1958 ar->ar_force_sync = 1;
1959 ar->ar_min_xid = ptlrpc_sample_next_xid();
1964 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1965 ar->ar_force_sync = 0;
1968 static void osc_oap_to_pending(struct osc_async_page *oap)
1970 struct loi_oap_pages *lop;
1972 if (oap->oap_cmd & OBD_BRW_WRITE)
1973 lop = &oap->oap_loi->loi_write_lop;
1975 lop = &oap->oap_loi->loi_read_lop;
1977 if (oap->oap_async_flags & ASYNC_URGENT)
1978 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1979 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1980 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1983 /* this must be called holding the loi list lock to give coverage to exit_cache,
1984 * async_flag maintenance, and oap_request */
1985 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1986 struct osc_async_page *oap, int sent, int rc)
1991 if (oap->oap_request != NULL) {
1992 xid = ptlrpc_req_xid(oap->oap_request);
1993 ptlrpc_req_finished(oap->oap_request);
1994 oap->oap_request = NULL;
1997 oap->oap_async_flags = 0;
1998 oap->oap_interrupted = 0;
2000 if (oap->oap_cmd & OBD_BRW_WRITE) {
2001 osc_process_ar(&cli->cl_ar, xid, rc);
2002 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2005 if (rc == 0 && oa != NULL) {
2006 if (oa->o_valid & OBD_MD_FLBLOCKS)
2007 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2008 if (oa->o_valid & OBD_MD_FLMTIME)
2009 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2010 if (oa->o_valid & OBD_MD_FLATIME)
2011 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2012 if (oa->o_valid & OBD_MD_FLCTIME)
2013 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2017 osc_exit_cache(cli, oap, sent);
2018 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2019 oap->oap_oig = NULL;
2024 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2025 oap->oap_cmd, oa, rc);
2027 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2028 * I/O on the page could start, but OSC calls it under lock
2029 * and thus we can add oap back to pending safely */
2031 /* upper layer wants to leave the page on pending queue */
2032 osc_oap_to_pending(oap);
2034 osc_exit_cache(cli, oap, sent);
2038 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
2040 struct osc_brw_async_args *aa = data;
2041 struct client_obd *cli;
2044 rc = osc_brw_fini_request(req, rc);
2045 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2046 if (osc_recoverable_error(rc)) {
2047 rc = osc_brw_redo_request(req, aa);
2054 client_obd_list_lock(&cli->cl_loi_list_lock);
2056 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2057 * is called so we know whether to go to sync BRWs or wait for more
2058 * RPCs to complete */
2059 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2060 cli->cl_w_in_flight--;
2062 cli->cl_r_in_flight--;
2064 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2065 struct osc_async_page *oap, *tmp;
2066 /* the caller may re-use the oap after the completion call so
2067 * we need to clean it up a little */
2068 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2069 list_del_init(&oap->oap_rpc_item);
2070 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2072 OBDO_FREE(aa->aa_oa);
2073 } else { /* from async_internal() */
2075 for (i = 0; i < aa->aa_page_count; i++)
2076 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2078 osc_wake_cache_waiters(cli);
2079 osc_check_rpcs(cli);
2080 client_obd_list_unlock(&cli->cl_loi_list_lock);
2082 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2086 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2087 struct list_head *rpc_list,
2088 int page_count, int cmd)
2090 struct ptlrpc_request *req;
2091 struct brw_page **pga = NULL;
2092 struct osc_brw_async_args *aa;
2093 struct obdo *oa = NULL;
2094 struct obd_async_page_ops *ops = NULL;
2095 void *caller_data = NULL;
2096 struct obd_capa *ocapa;
2097 struct osc_async_page *oap;
2101 LASSERT(!list_empty(rpc_list));
2103 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2105 RETURN(ERR_PTR(-ENOMEM));
2109 GOTO(out, req = ERR_PTR(-ENOMEM));
2112 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2114 ops = oap->oap_caller_ops;
2115 caller_data = oap->oap_caller_data;
2117 pga[i] = &oap->oap_brw_page;
2118 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2119 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2120 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2124 /* always get the data for the obdo for the rpc */
2125 LASSERT(ops != NULL);
2126 ops->ap_fill_obdo(caller_data, cmd, oa);
2127 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2129 sort_brw_pages(pga, page_count);
2130 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2134 CERROR("prep_req failed: %d\n", rc);
2135 GOTO(out, req = ERR_PTR(rc));
2138 /* Need to update the timestamps after the request is built in case
2139 * we race with setattr (locally or in queue at OST). If OST gets
2140 * later setattr before earlier BRW (as determined by the request xid),
2141 * the OST will not use BRW timestamps. Sadly, there is no obvious
2142 * way to do this in a single call. bug 10150 */
2143 ops->ap_update_obdo(caller_data, cmd, oa,
2144 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2146 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2147 aa = ptlrpc_req_async_args(req);
2148 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2149 list_splice(rpc_list, &aa->aa_oaps);
2150 CFS_INIT_LIST_HEAD(rpc_list);
2157 OBD_FREE(pga, sizeof(*pga) * page_count);
2162 /* the loi lock is held across this function but it's allowed to release
2163 * and reacquire it during its work */
2165 * prepare pages for ASYNC io and put pages in send queue.
2169 * \param cmd - OBD_BRW_* macroses
2170 * \param lop - pending pages
2172 * \return zero if pages successfully add to send queue.
2173 * \return not zere if error occurring.
2175 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2176 int cmd, struct loi_oap_pages *lop)
2178 struct ptlrpc_request *req;
2179 obd_count page_count = 0;
2180 struct osc_async_page *oap = NULL, *tmp;
2181 struct osc_brw_async_args *aa;
2182 struct obd_async_page_ops *ops;
2183 CFS_LIST_HEAD(rpc_list);
2184 unsigned int ending_offset;
2185 unsigned starting_offset = 0;
2189 /* first we find the pages we're allowed to work with */
2190 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2192 ops = oap->oap_caller_ops;
2194 LASSERT(oap->oap_magic == OAP_MAGIC);
2196 if (page_count != 0 &&
2197 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2198 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2199 " oap %p, page %p, srvlock %u\n",
2200 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2203 /* in llite being 'ready' equates to the page being locked
2204 * until completion unlocks it. commit_write submits a page
2205 * as not ready because its unlock will happen unconditionally
2206 * as the call returns. if we race with commit_write giving
2207 * us that page we dont' want to create a hole in the page
2208 * stream, so we stop and leave the rpc to be fired by
2209 * another dirtier or kupdated interval (the not ready page
2210 * will still be on the dirty list). we could call in
2211 * at the end of ll_file_write to process the queue again. */
2212 if (!(oap->oap_async_flags & ASYNC_READY)) {
2213 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2215 CDEBUG(D_INODE, "oap %p page %p returned %d "
2216 "instead of ready\n", oap,
2220 /* llite is telling us that the page is still
2221 * in commit_write and that we should try
2222 * and put it in an rpc again later. we
2223 * break out of the loop so we don't create
2224 * a hole in the sequence of pages in the rpc
2229 /* the io isn't needed.. tell the checks
2230 * below to complete the rpc with EINTR */
2231 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2232 oap->oap_count = -EINTR;
2235 oap->oap_async_flags |= ASYNC_READY;
2238 LASSERTF(0, "oap %p page %p returned %d "
2239 "from make_ready\n", oap,
2247 * Page submitted for IO has to be locked. Either by
2248 * ->ap_make_ready() or by higher layers.
2250 #if defined(__KERNEL__) && defined(__linux__)
2251 if(!(PageLocked(oap->oap_page) &&
2252 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2253 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2254 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2258 /* If there is a gap at the start of this page, it can't merge
2259 * with any previous page, so we'll hand the network a
2260 * "fragmented" page array that it can't transfer in 1 RDMA */
2261 if (page_count != 0 && oap->oap_page_off != 0)
2264 /* take the page out of our book-keeping */
2265 list_del_init(&oap->oap_pending_item);
2266 lop_update_pending(cli, lop, cmd, -1);
2267 list_del_init(&oap->oap_urgent_item);
2269 if (page_count == 0)
2270 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2271 (PTLRPC_MAX_BRW_SIZE - 1);
2273 /* ask the caller for the size of the io as the rpc leaves. */
2274 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2276 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2277 if (oap->oap_count <= 0) {
2278 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2280 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2284 /* now put the page back in our accounting */
2285 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2286 if (page_count == 0)
2287 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2288 if (++page_count >= cli->cl_max_pages_per_rpc)
2291 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2292 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2293 * have the same alignment as the initial writes that allocated
2294 * extents on the server. */
2295 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2296 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2297 if (ending_offset == 0)
2300 /* If there is a gap at the end of this page, it can't merge
2301 * with any subsequent pages, so we'll hand the network a
2302 * "fragmented" page array that it can't transfer in 1 RDMA */
2303 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2307 osc_wake_cache_waiters(cli);
2309 if (page_count == 0)
2312 loi_list_maint(cli, loi);
2314 client_obd_list_unlock(&cli->cl_loi_list_lock);
2316 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2318 /* this should happen rarely and is pretty bad, it makes the
2319 * pending list not follow the dirty order */
2320 client_obd_list_lock(&cli->cl_loi_list_lock);
2321 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2322 list_del_init(&oap->oap_rpc_item);
2324 /* queued sync pages can be torn down while the pages
2325 * were between the pending list and the rpc */
2326 if (oap->oap_interrupted) {
2327 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2328 osc_ap_completion(cli, NULL, oap, 0,
2332 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2334 loi_list_maint(cli, loi);
2335 RETURN(PTR_ERR(req));
2338 aa = ptlrpc_req_async_args(req);
2340 if (cmd == OBD_BRW_READ) {
2341 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2342 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2343 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2344 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2345 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2347 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2348 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2349 cli->cl_w_in_flight);
2350 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2351 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2352 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2355 client_obd_list_lock(&cli->cl_loi_list_lock);
2357 if (cmd == OBD_BRW_READ)
2358 cli->cl_r_in_flight++;
2360 cli->cl_w_in_flight++;
2362 /* queued sync pages can be torn down while the pages
2363 * were between the pending list and the rpc */
2365 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2366 /* only one oap gets a request reference */
2369 if (oap->oap_interrupted && !req->rq_intr) {
2370 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2372 ptlrpc_mark_interrupted(req);
2376 tmp->oap_request = ptlrpc_request_addref(req);
2378 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2379 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2381 req->rq_interpret_reply = brw_interpret;
2382 ptlrpcd_add_req(req);
2386 #define LOI_DEBUG(LOI, STR, args...) \
2387 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2388 !list_empty(&(LOI)->loi_cli_item), \
2389 (LOI)->loi_write_lop.lop_num_pending, \
2390 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2391 (LOI)->loi_read_lop.lop_num_pending, \
2392 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2395 /* This is called by osc_check_rpcs() to find which objects have pages that
2396 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2397 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2400 /* first return all objects which we already know to have
2401 * pages ready to be stuffed into rpcs */
2402 if (!list_empty(&cli->cl_loi_ready_list))
2403 RETURN(list_entry(cli->cl_loi_ready_list.next,
2404 struct lov_oinfo, loi_cli_item));
2406 /* then if we have cache waiters, return all objects with queued
2407 * writes. This is especially important when many small files
2408 * have filled up the cache and not been fired into rpcs because
2409 * they don't pass the nr_pending/object threshhold */
2410 if (!list_empty(&cli->cl_cache_waiters) &&
2411 !list_empty(&cli->cl_loi_write_list))
2412 RETURN(list_entry(cli->cl_loi_write_list.next,
2413 struct lov_oinfo, loi_write_item));
2415 /* then return all queued objects when we have an invalid import
2416 * so that they get flushed */
2417 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2418 if (!list_empty(&cli->cl_loi_write_list))
2419 RETURN(list_entry(cli->cl_loi_write_list.next,
2420 struct lov_oinfo, loi_write_item));
2421 if (!list_empty(&cli->cl_loi_read_list))
2422 RETURN(list_entry(cli->cl_loi_read_list.next,
2423 struct lov_oinfo, loi_read_item));
2428 /* called with the loi list lock held */
2429 static void osc_check_rpcs(struct client_obd *cli)
2431 struct lov_oinfo *loi;
2432 int rc = 0, race_counter = 0;
2435 while ((loi = osc_next_loi(cli)) != NULL) {
2436 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2438 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2441 /* attempt some read/write balancing by alternating between
2442 * reads and writes in an object. The makes_rpc checks here
2443 * would be redundant if we were getting read/write work items
2444 * instead of objects. we don't want send_oap_rpc to drain a
2445 * partial read pending queue when we're given this object to
2446 * do io on writes while there are cache waiters */
2447 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2448 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2449 &loi->loi_write_lop);
2457 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2458 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2459 &loi->loi_read_lop);
2468 /* attempt some inter-object balancing by issueing rpcs
2469 * for each object in turn */
2470 if (!list_empty(&loi->loi_cli_item))
2471 list_del_init(&loi->loi_cli_item);
2472 if (!list_empty(&loi->loi_write_item))
2473 list_del_init(&loi->loi_write_item);
2474 if (!list_empty(&loi->loi_read_item))
2475 list_del_init(&loi->loi_read_item);
2477 loi_list_maint(cli, loi);
2479 /* send_oap_rpc fails with 0 when make_ready tells it to
2480 * back off. llite's make_ready does this when it tries
2481 * to lock a page queued for write that is already locked.
2482 * we want to try sending rpcs from many objects, but we
2483 * don't want to spin failing with 0. */
2484 if (race_counter == 10)
2490 /* we're trying to queue a page in the osc so we're subject to the
2491 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2492 * If the osc's queued pages are already at that limit, then we want to sleep
2493 * until there is space in the osc's queue for us. We also may be waiting for
2494 * write credits from the OST if there are RPCs in flight that may return some
2495 * before we fall back to sync writes.
2497 * We need this know our allocation was granted in the presence of signals */
2498 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2502 client_obd_list_lock(&cli->cl_loi_list_lock);
2503 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2504 client_obd_list_unlock(&cli->cl_loi_list_lock);
2508 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2509 * grant or cache space. */
2510 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2511 struct osc_async_page *oap)
2513 struct osc_cache_waiter ocw;
2514 struct l_wait_info lwi = { 0 };
2518 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2519 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2520 cli->cl_dirty_max, obd_max_dirty_pages,
2521 cli->cl_lost_grant, cli->cl_avail_grant);
2523 /* force the caller to try sync io. this can jump the list
2524 * of queued writes and create a discontiguous rpc stream */
2525 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2526 loi->loi_ar.ar_force_sync)
2529 /* Hopefully normal case - cache space and write credits available */
2530 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2531 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2532 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2533 /* account for ourselves */
2534 osc_consume_write_grant(cli, &oap->oap_brw_page);
2538 /* Make sure that there are write rpcs in flight to wait for. This
2539 * is a little silly as this object may not have any pending but
2540 * other objects sure might. */
2541 if (cli->cl_w_in_flight) {
2542 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2543 cfs_waitq_init(&ocw.ocw_waitq);
2547 loi_list_maint(cli, loi);
2548 osc_check_rpcs(cli);
2549 client_obd_list_unlock(&cli->cl_loi_list_lock);
2551 CDEBUG(D_CACHE, "sleeping for cache space\n");
2552 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2554 client_obd_list_lock(&cli->cl_loi_list_lock);
2555 if (!list_empty(&ocw.ocw_entry)) {
2556 list_del(&ocw.ocw_entry);
2566 * Checks if requested extent lock is compatible with a lock under the page.
2568 * Checks if the lock under \a page is compatible with a read or write lock
2569 * (specified by \a rw) for an extent [\a start , \a end].
2571 * \param exp osc export
2572 * \param lsm striping information for the file
2573 * \param res osc_async_page placeholder
2574 * \param rw OBD_BRW_READ if requested for reading,
2575 * OBD_BRW_WRITE if requested for writing
2576 * \param start start of the requested extent
2577 * \param end end of the requested extent
2578 * \param cookie transparent parameter for passing locking context
2580 * \post result == 1, *cookie == context, appropriate lock is referenced or
2583 * \retval 1 owned lock is reused for the request
2584 * \retval 0 no lock reused for the request
2586 * \see osc_release_short_lock
2588 static int osc_reget_short_lock(struct obd_export *exp,
2589 struct lov_stripe_md *lsm,
2591 obd_off start, obd_off end,
2594 struct osc_async_page *oap = *res;
2599 spin_lock(&oap->oap_lock);
2600 rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2601 start, end, cookie);
2602 spin_unlock(&oap->oap_lock);
2608 * Releases a reference to a lock taken in a "fast" way.
2610 * Releases a read or a write (specified by \a rw) lock
2611 * referenced by \a cookie.
2613 * \param exp osc export
2614 * \param lsm striping information for the file
2615 * \param end end of the locked extent
2616 * \param rw OBD_BRW_READ if requested for reading,
2617 * OBD_BRW_WRITE if requested for writing
2618 * \param cookie transparent parameter for passing locking context
2620 * \post appropriate lock is dereferenced
2622 * \see osc_reget_short_lock
2624 static int osc_release_short_lock(struct obd_export *exp,
2625 struct lov_stripe_md *lsm, obd_off end,
2626 void *cookie, int rw)
2629 ldlm_lock_fast_release(cookie, rw);
2630 /* no error could have happened at this layer */
2634 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2635 struct lov_oinfo *loi, cfs_page_t *page,
2636 obd_off offset, struct obd_async_page_ops *ops,
2637 void *data, void **res, int nocache,
2638 struct lustre_handle *lockh)
2640 struct osc_async_page *oap;
2641 struct ldlm_res_id oid;
2646 return size_round(sizeof(*oap));
2649 oap->oap_magic = OAP_MAGIC;
2650 oap->oap_cli = &exp->exp_obd->u.cli;
2653 oap->oap_caller_ops = ops;
2654 oap->oap_caller_data = data;
2656 oap->oap_page = page;
2657 oap->oap_obj_off = offset;
2659 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2660 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2661 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2662 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2664 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2666 spin_lock_init(&oap->oap_lock);
2668 /* If the page was marked as notcacheable - don't add to any locks */
2670 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2671 /* This is the only place where we can call cache_add_extent
2672 without oap_lock, because this page is locked now, and
2673 the lock we are adding it to is referenced, so cannot lose
2674 any pages either. */
2675 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2680 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2684 struct osc_async_page *oap_from_cookie(void *cookie)
2686 struct osc_async_page *oap = cookie;
2687 if (oap->oap_magic != OAP_MAGIC)
2688 return ERR_PTR(-EINVAL);
2692 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2693 struct lov_oinfo *loi, void *cookie,
2694 int cmd, obd_off off, int count,
2695 obd_flag brw_flags, enum async_flags async_flags)
2697 struct client_obd *cli = &exp->exp_obd->u.cli;
2698 struct osc_async_page *oap;
2702 oap = oap_from_cookie(cookie);
2704 RETURN(PTR_ERR(oap));
2706 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2709 if (!list_empty(&oap->oap_pending_item) ||
2710 !list_empty(&oap->oap_urgent_item) ||
2711 !list_empty(&oap->oap_rpc_item))
2714 /* check if the file's owner/group is over quota */
2715 #ifdef HAVE_QUOTA_SUPPORT
2716 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2717 struct obd_async_page_ops *ops;
2724 ops = oap->oap_caller_ops;
2725 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2726 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2737 loi = lsm->lsm_oinfo[0];
2739 client_obd_list_lock(&cli->cl_loi_list_lock);
2742 oap->oap_page_off = off;
2743 oap->oap_count = count;
2744 oap->oap_brw_flags = brw_flags;
2745 oap->oap_async_flags = async_flags;
2747 if (cmd & OBD_BRW_WRITE) {
2748 rc = osc_enter_cache(cli, loi, oap);
2750 client_obd_list_unlock(&cli->cl_loi_list_lock);
2755 osc_oap_to_pending(oap);
2756 loi_list_maint(cli, loi);
2758 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2761 osc_check_rpcs(cli);
2762 client_obd_list_unlock(&cli->cl_loi_list_lock);
2767 /* aka (~was & now & flag), but this is more clear :) */
2768 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2770 static int osc_set_async_flags(struct obd_export *exp,
2771 struct lov_stripe_md *lsm,
2772 struct lov_oinfo *loi, void *cookie,
2773 obd_flag async_flags)
2775 struct client_obd *cli = &exp->exp_obd->u.cli;
2776 struct loi_oap_pages *lop;
2777 struct osc_async_page *oap;
2781 oap = oap_from_cookie(cookie);
2783 RETURN(PTR_ERR(oap));
2786 * bug 7311: OST-side locking is only supported for liblustre for now
2787 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2788 * implementation has to handle case where OST-locked page was picked
2789 * up by, e.g., ->writepage().
2791 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2792 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2795 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2799 loi = lsm->lsm_oinfo[0];
2801 if (oap->oap_cmd & OBD_BRW_WRITE) {
2802 lop = &loi->loi_write_lop;
2804 lop = &loi->loi_read_lop;
2807 client_obd_list_lock(&cli->cl_loi_list_lock);
2809 if (list_empty(&oap->oap_pending_item))
2810 GOTO(out, rc = -EINVAL);
2812 if ((oap->oap_async_flags & async_flags) == async_flags)
2815 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2816 oap->oap_async_flags |= ASYNC_READY;
2818 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2819 if (list_empty(&oap->oap_rpc_item)) {
2820 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2821 loi_list_maint(cli, loi);
2825 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2826 oap->oap_async_flags);
2828 osc_check_rpcs(cli);
2829 client_obd_list_unlock(&cli->cl_loi_list_lock);
2833 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2834 struct lov_oinfo *loi,
2835 struct obd_io_group *oig, void *cookie,
2836 int cmd, obd_off off, int count,
2838 obd_flag async_flags)
2840 struct client_obd *cli = &exp->exp_obd->u.cli;
2841 struct osc_async_page *oap;
2842 struct loi_oap_pages *lop;
2846 oap = oap_from_cookie(cookie);
2848 RETURN(PTR_ERR(oap));
2850 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2853 if (!list_empty(&oap->oap_pending_item) ||
2854 !list_empty(&oap->oap_urgent_item) ||
2855 !list_empty(&oap->oap_rpc_item))
2859 loi = lsm->lsm_oinfo[0];
2861 client_obd_list_lock(&cli->cl_loi_list_lock);
2864 oap->oap_page_off = off;
2865 oap->oap_count = count;
2866 oap->oap_brw_flags = brw_flags;
2867 oap->oap_async_flags = async_flags;
2869 if (cmd & OBD_BRW_WRITE)
2870 lop = &loi->loi_write_lop;
2872 lop = &loi->loi_read_lop;
2874 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2875 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2877 rc = oig_add_one(oig, &oap->oap_occ);
2880 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2881 oap, oap->oap_page, rc);
2883 client_obd_list_unlock(&cli->cl_loi_list_lock);
2888 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2889 struct loi_oap_pages *lop, int cmd)
2891 struct list_head *pos, *tmp;
2892 struct osc_async_page *oap;
2894 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2895 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2896 list_del(&oap->oap_pending_item);
2897 osc_oap_to_pending(oap);
2899 loi_list_maint(cli, loi);
2902 static int osc_trigger_group_io(struct obd_export *exp,
2903 struct lov_stripe_md *lsm,
2904 struct lov_oinfo *loi,
2905 struct obd_io_group *oig)
2907 struct client_obd *cli = &exp->exp_obd->u.cli;
2911 loi = lsm->lsm_oinfo[0];
2913 client_obd_list_lock(&cli->cl_loi_list_lock);
2915 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2916 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2918 osc_check_rpcs(cli);
2919 client_obd_list_unlock(&cli->cl_loi_list_lock);
2924 static int osc_teardown_async_page(struct obd_export *exp,
2925 struct lov_stripe_md *lsm,
2926 struct lov_oinfo *loi, void *cookie)
2928 struct client_obd *cli = &exp->exp_obd->u.cli;
2929 struct loi_oap_pages *lop;
2930 struct osc_async_page *oap;
2934 oap = oap_from_cookie(cookie);
2936 RETURN(PTR_ERR(oap));
2939 loi = lsm->lsm_oinfo[0];
2941 if (oap->oap_cmd & OBD_BRW_WRITE) {
2942 lop = &loi->loi_write_lop;
2944 lop = &loi->loi_read_lop;
2947 client_obd_list_lock(&cli->cl_loi_list_lock);
2949 if (!list_empty(&oap->oap_rpc_item))
2950 GOTO(out, rc = -EBUSY);
2952 osc_exit_cache(cli, oap, 0);
2953 osc_wake_cache_waiters(cli);
2955 if (!list_empty(&oap->oap_urgent_item)) {
2956 list_del_init(&oap->oap_urgent_item);
2957 oap->oap_async_flags &= ~ASYNC_URGENT;
2959 if (!list_empty(&oap->oap_pending_item)) {
2960 list_del_init(&oap->oap_pending_item);
2961 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2963 loi_list_maint(cli, loi);
2964 cache_remove_extent(cli->cl_cache, oap);
2966 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2968 client_obd_list_unlock(&cli->cl_loi_list_lock);
2972 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2973 struct ldlm_lock_desc *new, void *data,
2976 struct lustre_handle lockh = { 0 };
2980 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2981 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2986 case LDLM_CB_BLOCKING:
2987 ldlm_lock2handle(lock, &lockh);
2988 rc = ldlm_cli_cancel(&lockh);
2990 CERROR("ldlm_cli_cancel failed: %d\n", rc);
2992 case LDLM_CB_CANCELING: {
2994 ldlm_lock2handle(lock, &lockh);
2995 /* This lock wasn't granted, don't try to do anything */
2996 if (lock->l_req_mode != lock->l_granted_mode)
2999 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3002 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3003 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3004 lock, new, data,flag);
3013 EXPORT_SYMBOL(osc_extent_blocking_cb);
3015 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3018 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3021 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3024 lock_res_and_lock(lock);
3025 #if defined (__KERNEL__) && defined (__linux__)
3026 /* Liang XXX: Darwin and Winnt checking should be added */
3027 if (lock->l_ast_data && lock->l_ast_data != data) {
3028 struct inode *new_inode = data;
3029 struct inode *old_inode = lock->l_ast_data;
3030 if (!(old_inode->i_state & I_FREEING))
3031 LDLM_ERROR(lock, "inconsistent l_ast_data found");
3032 LASSERTF(old_inode->i_state & I_FREEING,
3033 "Found existing inode %p/%lu/%u state %lu in lock: "
3034 "setting data to %p/%lu/%u\n", old_inode,
3035 old_inode->i_ino, old_inode->i_generation,
3037 new_inode, new_inode->i_ino, new_inode->i_generation);
3040 lock->l_ast_data = data;
3041 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3042 unlock_res_and_lock(lock);
3043 LDLM_LOCK_PUT(lock);
3046 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3047 ldlm_iterator_t replace, void *data)
3049 struct ldlm_res_id res_id;
3050 struct obd_device *obd = class_exp2obd(exp);
3052 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3053 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3057 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3058 struct obd_info *oinfo, int intent, int rc)
3063 /* The request was created before ldlm_cli_enqueue call. */
3064 if (rc == ELDLM_LOCK_ABORTED) {
3065 struct ldlm_reply *rep;
3066 rep = req_capsule_server_get(&req->rq_pill,
3069 LASSERT(rep != NULL);
3070 if (rep->lock_policy_res1)
3071 rc = rep->lock_policy_res1;
3075 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3076 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3077 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3078 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3079 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3083 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3085 /* Call the update callback. */
3086 rc = oinfo->oi_cb_up(oinfo, rc);
3090 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3091 struct osc_enqueue_args *aa, int rc)
3093 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3094 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3095 struct ldlm_lock *lock;
3097 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3099 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3101 /* Complete obtaining the lock procedure. */
3102 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3104 &aa->oa_oi->oi_flags,
3105 &lsm->lsm_oinfo[0]->loi_lvb,
3106 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3107 lustre_swab_ost_lvb,
3108 aa->oa_oi->oi_lockh, rc);
3110 /* Complete osc stuff. */
3111 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3113 /* Release the lock for async request. */
3114 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3115 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3117 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3118 aa->oa_oi->oi_lockh, req, aa);
3119 LDLM_LOCK_PUT(lock);
3123 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3124 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3125 * other synchronous requests, however keeping some locks and trying to obtain
3126 * others may take a considerable amount of time in a case of ost failure; and
3127 * when other sync requests do not get released lock from a client, the client
3128 * is excluded from the cluster -- such scenarious make the life difficult, so
3129 * release locks just after they are obtained. */
3130 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3131 struct ldlm_enqueue_info *einfo,
3132 struct ptlrpc_request_set *rqset)
3134 struct ldlm_res_id res_id;
3135 struct obd_device *obd = exp->exp_obd;
3136 struct ptlrpc_request *req = NULL;
3137 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3143 osc_build_res_name(oinfo->oi_md->lsm_object_id,
3144 oinfo->oi_md->lsm_object_gr, &res_id);
3145 /* Filesystem lock extents are extended to page boundaries so that
3146 * dealing with the page cache is a little smoother. */
3147 oinfo->oi_policy.l_extent.start -=
3148 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3149 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3151 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3154 /* Next, search for already existing extent locks that will cover us */
3155 /* If we're trying to read, we also search for an existing PW lock. The
3156 * VFS and page cache already protect us locally, so lots of readers/
3157 * writers can share a single PW lock.
3159 * There are problems with conversion deadlocks, so instead of
3160 * converting a read lock to a write lock, we'll just enqueue a new
3163 * At some point we should cancel the read lock instead of making them
3164 * send us a blocking callback, but there are problems with canceling
3165 * locks out from other users right now, too. */
3166 mode = einfo->ei_mode;
3167 if (einfo->ei_mode == LCK_PR)
3169 mode = ldlm_lock_match(obd->obd_namespace,
3170 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3171 einfo->ei_type, &oinfo->oi_policy, mode,
3174 /* addref the lock only if not async requests and PW lock is
3175 * matched whereas we asked for PR. */
3176 if (!rqset && einfo->ei_mode != mode)
3177 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3178 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3181 /* I would like to be able to ASSERT here that rss <=
3182 * kms, but I can't, for reasons which are explained in
3186 /* We already have a lock, and it's referenced */
3187 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3189 /* For async requests, decref the lock. */
3190 if (einfo->ei_mode != mode)
3191 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3193 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3200 CFS_LIST_HEAD(cancels);
3201 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3202 &RQF_LDLM_ENQUEUE_LVB);
3206 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3210 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3211 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3212 ptlrpc_request_set_replen(req);
3215 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3216 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3218 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3219 &oinfo->oi_policy, &oinfo->oi_flags,
3220 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3221 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3222 lustre_swab_ost_lvb, oinfo->oi_lockh,
3226 struct osc_enqueue_args *aa;
3227 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3228 aa = ptlrpc_req_async_args(req);
3233 req->rq_interpret_reply = osc_enqueue_interpret;
3234 ptlrpc_set_add_req(rqset, req);
3235 } else if (intent) {
3236 ptlrpc_req_finished(req);
3241 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3243 ptlrpc_req_finished(req);
3248 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3249 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3250 int *flags, void *data, struct lustre_handle *lockh)
3252 struct ldlm_res_id res_id;
3253 struct obd_device *obd = exp->exp_obd;
3254 int lflags = *flags;
3258 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3260 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3263 /* Filesystem lock extents are extended to page boundaries so that
3264 * dealing with the page cache is a little smoother */
3265 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3266 policy->l_extent.end |= ~CFS_PAGE_MASK;
3268 /* Next, search for already existing extent locks that will cover us */
3269 /* If we're trying to read, we also search for an existing PW lock. The
3270 * VFS and page cache already protect us locally, so lots of readers/
3271 * writers can share a single PW lock. */
3275 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3276 &res_id, type, policy, rc, lockh);
3278 osc_set_data_with_check(lockh, data, lflags);
3279 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3280 ldlm_lock_addref(lockh, LCK_PR);
3281 ldlm_lock_decref(lockh, LCK_PW);
3288 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3289 __u32 mode, struct lustre_handle *lockh)
3293 if (unlikely(mode == LCK_GROUP))
3294 ldlm_lock_decref_and_cancel(lockh, mode);
3296 ldlm_lock_decref(lockh, mode);
3301 static int osc_cancel_unused(struct obd_export *exp,
3302 struct lov_stripe_md *lsm, int flags,
3305 struct obd_device *obd = class_exp2obd(exp);
3306 struct ldlm_res_id res_id, *resp = NULL;
3309 resp = osc_build_res_name(lsm->lsm_object_id,
3310 lsm->lsm_object_gr, &res_id);
3313 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3316 static int osc_join_lru(struct obd_export *exp,
3317 struct lov_stripe_md *lsm, int join)
3319 struct obd_device *obd = class_exp2obd(exp);
3320 struct ldlm_res_id res_id, *resp = NULL;
3323 resp = osc_build_res_name(lsm->lsm_object_id,
3324 lsm->lsm_object_gr, &res_id);
3327 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3330 static int osc_statfs_interpret(struct ptlrpc_request *req,
3331 struct osc_async_args *aa, int rc)
3333 struct obd_statfs *msfs;
3339 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3341 GOTO(out, rc = -EPROTO);
3344 *aa->aa_oi->oi_osfs = *msfs;
3346 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3350 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3351 __u64 max_age, struct ptlrpc_request_set *rqset)
3353 struct ptlrpc_request *req;
3354 struct osc_async_args *aa;
3358 /* We could possibly pass max_age in the request (as an absolute
3359 * timestamp or a "seconds.usec ago") so the target can avoid doing
3360 * extra calls into the filesystem if that isn't necessary (e.g.
3361 * during mount that would help a bit). Having relative timestamps
3362 * is not so great if request processing is slow, while absolute
3363 * timestamps are not ideal because they need time synchronization. */
3364 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3368 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3370 ptlrpc_request_free(req);
3373 ptlrpc_request_set_replen(req);
3374 req->rq_request_portal = OST_CREATE_PORTAL;
3375 ptlrpc_at_set_req_timeout(req);
3377 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3378 /* procfs requests not want stat in wait for avoid deadlock */
3379 req->rq_no_resend = 1;
3380 req->rq_no_delay = 1;
3383 req->rq_interpret_reply = osc_statfs_interpret;
3384 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3385 aa = ptlrpc_req_async_args(req);
3388 ptlrpc_set_add_req(rqset, req);
3392 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3393 __u64 max_age, __u32 flags)
3395 struct obd_statfs *msfs;
3396 struct ptlrpc_request *req;
3397 struct obd_import *imp = NULL;
3401 /*Since the request might also come from lprocfs, so we need
3402 *sync this with client_disconnect_export Bug15684*/
3403 down_read(&obd->u.cli.cl_sem);
3404 if (obd->u.cli.cl_import)
3405 imp = class_import_get(obd->u.cli.cl_import);
3406 up_read(&obd->u.cli.cl_sem);
3410 /* We could possibly pass max_age in the request (as an absolute
3411 * timestamp or a "seconds.usec ago") so the target can avoid doing
3412 * extra calls into the filesystem if that isn't necessary (e.g.
3413 * during mount that would help a bit). Having relative timestamps
3414 * is not so great if request processing is slow, while absolute
3415 * timestamps are not ideal because they need time synchronization. */
3416 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3418 class_import_put(imp);
3423 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3425 ptlrpc_request_free(req);
3428 ptlrpc_request_set_replen(req);
3429 req->rq_request_portal = OST_CREATE_PORTAL;
3430 ptlrpc_at_set_req_timeout(req);
3432 if (flags & OBD_STATFS_NODELAY) {
3433 /* procfs requests not want stat in wait for avoid deadlock */
3434 req->rq_no_resend = 1;
3435 req->rq_no_delay = 1;
3438 rc = ptlrpc_queue_wait(req);
3442 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3444 GOTO(out, rc = -EPROTO);
3451 ptlrpc_req_finished(req);
3455 /* Retrieve object striping information.
3457 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3458 * the maximum number of OST indices which will fit in the user buffer.
3459 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3461 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3463 struct lov_user_md lum, *lumk;
3464 int rc = 0, lum_size;
3470 if (copy_from_user(&lum, lump, sizeof(lum)))
3473 if (lum.lmm_magic != LOV_USER_MAGIC)
3476 if (lum.lmm_stripe_count > 0) {
3477 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3478 OBD_ALLOC(lumk, lum_size);
3482 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3483 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3485 lum_size = sizeof(lum);
3489 lumk->lmm_object_id = lsm->lsm_object_id;
3490 lumk->lmm_object_gr = lsm->lsm_object_gr;
3491 lumk->lmm_stripe_count = 1;
3493 if (copy_to_user(lump, lumk, lum_size))
3497 OBD_FREE(lumk, lum_size);
3503 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3504 void *karg, void *uarg)
3506 struct obd_device *obd = exp->exp_obd;
3507 struct obd_ioctl_data *data = karg;
3511 if (!try_module_get(THIS_MODULE)) {
3512 CERROR("Can't get module. Is it alive?");
3516 case OBD_IOC_LOV_GET_CONFIG: {
3518 struct lov_desc *desc;
3519 struct obd_uuid uuid;
3523 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3524 GOTO(out, err = -EINVAL);
3526 data = (struct obd_ioctl_data *)buf;
3528 if (sizeof(*desc) > data->ioc_inllen1) {
3529 obd_ioctl_freedata(buf, len);
3530 GOTO(out, err = -EINVAL);
3533 if (data->ioc_inllen2 < sizeof(uuid)) {
3534 obd_ioctl_freedata(buf, len);
3535 GOTO(out, err = -EINVAL);
3538 desc = (struct lov_desc *)data->ioc_inlbuf1;
3539 desc->ld_tgt_count = 1;
3540 desc->ld_active_tgt_count = 1;
3541 desc->ld_default_stripe_count = 1;
3542 desc->ld_default_stripe_size = 0;
3543 desc->ld_default_stripe_offset = 0;
3544 desc->ld_pattern = 0;
3545 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3547 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3549 err = copy_to_user((void *)uarg, buf, len);
3552 obd_ioctl_freedata(buf, len);
3555 case LL_IOC_LOV_SETSTRIPE:
3556 err = obd_alloc_memmd(exp, karg);
3560 case LL_IOC_LOV_GETSTRIPE:
3561 err = osc_getstripe(karg, uarg);
3563 case OBD_IOC_CLIENT_RECOVER:
3564 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3569 case IOC_OSC_SET_ACTIVE:
3570 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3573 case OBD_IOC_POLL_QUOTACHECK:
3574 err = lquota_poll_check(quota_interface, exp,
3575 (struct if_quotacheck *)karg);
3578 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3579 cmd, cfs_curproc_comm());
3580 GOTO(out, err = -ENOTTY);
3583 module_put(THIS_MODULE);
3587 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3588 void *key, __u32 *vallen, void *val)
3591 if (!vallen || !val)
3594 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3595 __u32 *stripe = val;
3596 *vallen = sizeof(*stripe);
3599 } else if (KEY_IS(KEY_LAST_ID)) {
3600 struct ptlrpc_request *req;
3605 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3606 &RQF_OST_GET_INFO_LAST_ID);
3610 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3611 RCL_CLIENT, keylen);
3612 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3614 ptlrpc_request_free(req);
3618 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3619 memcpy(tmp, key, keylen);
3621 ptlrpc_request_set_replen(req);
3622 rc = ptlrpc_queue_wait(req);
3626 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3628 GOTO(out, rc = -EPROTO);
3630 *((obd_id *)val) = *reply;
3632 ptlrpc_req_finished(req);
3638 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3641 struct llog_ctxt *ctxt;
3642 struct obd_import *imp = req->rq_import;
3648 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3651 rc = llog_initiator_connect(ctxt);
3653 CERROR("cannot establish connection for "
3654 "ctxt %p: %d\n", ctxt, rc);
3657 llog_ctxt_put(ctxt);
3658 spin_lock(&imp->imp_lock);
3659 imp->imp_server_timeout = 1;
3660 imp->imp_pingable = 1;
3661 spin_unlock(&imp->imp_lock);
3662 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3667 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3668 void *key, obd_count vallen, void *val,
3669 struct ptlrpc_request_set *set)
3671 struct ptlrpc_request *req;
3672 struct obd_device *obd = exp->exp_obd;
3673 struct obd_import *imp = class_exp2cliimp(exp);
3678 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3680 if (KEY_IS(KEY_NEXT_ID)) {
3681 if (vallen != sizeof(obd_id))
3685 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3686 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3687 exp->exp_obd->obd_name,
3688 obd->u.cli.cl_oscc.oscc_next_id);
3693 if (KEY_IS(KEY_UNLINKED)) {
3694 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3695 spin_lock(&oscc->oscc_lock);
3696 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3697 spin_unlock(&oscc->oscc_lock);
3701 if (KEY_IS(KEY_INIT_RECOV)) {
3702 if (vallen != sizeof(int))
3704 spin_lock(&imp->imp_lock);
3705 imp->imp_initial_recov = *(int *)val;
3706 spin_unlock(&imp->imp_lock);
3707 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3708 exp->exp_obd->obd_name,
3709 imp->imp_initial_recov);
3713 if (KEY_IS(KEY_CHECKSUM)) {
3714 if (vallen != sizeof(int))
3716 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3720 if (KEY_IS(KEY_FLUSH_CTX)) {
3721 sptlrpc_import_flush_my_ctx(imp);
3728 /* We pass all other commands directly to OST. Since nobody calls osc
3729 methods directly and everybody is supposed to go through LOV, we
3730 assume lov checked invalid values for us.
3731 The only recognised values so far are evict_by_nid and mds_conn.
3732 Even if something bad goes through, we'd get a -EINVAL from OST
3736 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3740 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3741 RCL_CLIENT, keylen);
3742 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3743 RCL_CLIENT, vallen);
3744 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3746 ptlrpc_request_free(req);
3750 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3751 memcpy(tmp, key, keylen);
3752 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3753 memcpy(tmp, val, vallen);
3755 if (KEY_IS(KEY_MDS_CONN)) {
3756 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3758 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3759 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3760 LASSERT(oscc->oscc_oa.o_gr > 0);
3761 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3764 ptlrpc_request_set_replen(req);
3765 ptlrpc_set_add_req(set, req);
3766 ptlrpc_check_set(set);
3772 static struct llog_operations osc_size_repl_logops = {
3773 lop_cancel: llog_obd_repl_cancel
3776 static struct llog_operations osc_mds_ost_orig_logops;
3777 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3778 struct obd_device *tgt, int count,
3779 struct llog_catid *catid, struct obd_uuid *uuid)
3784 LASSERT(olg == &obd->obd_olg);
3785 spin_lock(&obd->obd_dev_lock);
3786 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3787 osc_mds_ost_orig_logops = llog_lvfs_ops;
3788 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3789 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3790 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3791 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3793 spin_unlock(&obd->obd_dev_lock);
3795 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3796 &catid->lci_logid, &osc_mds_ost_orig_logops);
3798 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3802 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3803 NULL, &osc_size_repl_logops);
3805 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3808 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3809 obd->obd_name, tgt->obd_name, count, catid, rc);
3810 CERROR("logid "LPX64":0x%x\n",
3811 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3816 static int osc_llog_finish(struct obd_device *obd, int count)
3818 struct llog_ctxt *ctxt;
3819 int rc = 0, rc2 = 0;
3822 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3824 rc = llog_cleanup(ctxt);
3826 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3828 rc2 = llog_cleanup(ctxt);
3835 static int osc_reconnect(const struct lu_env *env,
3836 struct obd_export *exp, struct obd_device *obd,
3837 struct obd_uuid *cluuid,
3838 struct obd_connect_data *data)
3840 struct client_obd *cli = &obd->u.cli;
3842 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3845 client_obd_list_lock(&cli->cl_loi_list_lock);
3846 data->ocd_grant = cli->cl_avail_grant ?:
3847 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3848 lost_grant = cli->cl_lost_grant;
3849 cli->cl_lost_grant = 0;
3850 client_obd_list_unlock(&cli->cl_loi_list_lock);
3852 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3853 "cl_lost_grant: %ld\n", data->ocd_grant,
3854 cli->cl_avail_grant, lost_grant);
3855 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3856 " ocd_grant: %d\n", data->ocd_connect_flags,
3857 data->ocd_version, data->ocd_grant);
3863 static int osc_disconnect(struct obd_export *exp)
3865 struct obd_device *obd = class_exp2obd(exp);
3866 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3869 if (obd->u.cli.cl_conn_count == 1)
3870 /* flush any remaining cancel messages out to the target */
3871 llog_sync(ctxt, exp);
3873 llog_ctxt_put(ctxt);
3875 rc = client_disconnect_export(exp);
3879 static int osc_import_event(struct obd_device *obd,
3880 struct obd_import *imp,
3881 enum obd_import_event event)
3883 struct client_obd *cli;
3887 LASSERT(imp->imp_obd == obd);
3890 case IMP_EVENT_DISCON: {
3891 /* Only do this on the MDS OSC's */
3892 if (imp->imp_server_timeout) {
3893 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3895 spin_lock(&oscc->oscc_lock);
3896 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3897 spin_unlock(&oscc->oscc_lock);
3900 client_obd_list_lock(&cli->cl_loi_list_lock);
3901 cli->cl_avail_grant = 0;
3902 cli->cl_lost_grant = 0;
3903 client_obd_list_unlock(&cli->cl_loi_list_lock);
3906 case IMP_EVENT_INACTIVE: {
3907 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3910 case IMP_EVENT_INVALIDATE: {
3911 struct ldlm_namespace *ns = obd->obd_namespace;
3915 client_obd_list_lock(&cli->cl_loi_list_lock);
3916 /* all pages go to failing rpcs due to the invalid import */
3917 osc_check_rpcs(cli);
3918 client_obd_list_unlock(&cli->cl_loi_list_lock);
3920 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3924 case IMP_EVENT_ACTIVE: {
3925 /* Only do this on the MDS OSC's */
3926 if (imp->imp_server_timeout) {
3927 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3929 spin_lock(&oscc->oscc_lock);
3930 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3931 spin_unlock(&oscc->oscc_lock);
3933 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3936 case IMP_EVENT_OCD: {
3937 struct obd_connect_data *ocd = &imp->imp_connect_data;
3939 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3940 osc_init_grant(&obd->u.cli, ocd);
3943 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3944 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3946 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3950 CERROR("Unknown import event %d\n", event);
3956 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3962 rc = ptlrpcd_addref();
3966 rc = client_obd_setup(obd, lcfg);
3970 struct lprocfs_static_vars lvars = { 0 };
3971 struct client_obd *cli = &obd->u.cli;
3973 lprocfs_osc_init_vars(&lvars);
3974 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3975 lproc_osc_attach_seqstat(obd);
3976 sptlrpc_lprocfs_cliobd_attach(obd);
3977 ptlrpc_lprocfs_register_obd(obd);
3981 /* We need to allocate a few requests more, because
3982 brw_interpret tries to create new requests before freeing
3983 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3984 reserved, but I afraid that might be too much wasted RAM
3985 in fact, so 2 is just my guess and still should work. */
3986 cli->cl_import->imp_rq_pool =
3987 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3989 ptlrpc_add_rqs_to_pool);
3990 cli->cl_cache = cache_create(obd);
3991 if (!cli->cl_cache) {
4000 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4006 case OBD_CLEANUP_EARLY: {
4007 struct obd_import *imp;
4008 imp = obd->u.cli.cl_import;
4009 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4010 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4011 ptlrpc_deactivate_import(imp);
4012 spin_lock(&imp->imp_lock);
4013 imp->imp_pingable = 0;
4014 spin_unlock(&imp->imp_lock);
4017 case OBD_CLEANUP_EXPORTS: {
4018 /* If we set up but never connected, the
4019 client import will not have been cleaned. */
4020 if (obd->u.cli.cl_import) {
4021 struct obd_import *imp;
4022 imp = obd->u.cli.cl_import;
4023 CDEBUG(D_CONFIG, "%s: client import never connected\n",
4025 ptlrpc_invalidate_import(imp);
4026 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4027 class_destroy_import(imp);
4028 obd->u.cli.cl_import = NULL;
4030 rc = obd_llog_finish(obd, 0);
4032 CERROR("failed to cleanup llogging subsystems\n");
4039 int osc_cleanup(struct obd_device *obd)
4041 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4045 ptlrpc_lprocfs_unregister_obd(obd);
4046 lprocfs_obd_cleanup(obd);
4048 spin_lock(&oscc->oscc_lock);
4049 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4050 oscc->oscc_flags |= OSCC_FLAG_EXITING;
4051 spin_unlock(&oscc->oscc_lock);
4053 /* free memory of osc quota cache */
4054 lquota_cleanup(quota_interface, obd);
4056 cache_destroy(obd->u.cli.cl_cache);
4057 rc = client_obd_cleanup(obd);
4063 static int osc_register_page_removal_cb(struct obd_export *exp,
4064 obd_page_removal_cb_t func,
4065 obd_pin_extent_cb pin_cb)
4067 return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4071 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4072 obd_page_removal_cb_t func)
4074 return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4077 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4078 obd_lock_cancel_cb cb)
4080 LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4082 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4086 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4087 obd_lock_cancel_cb cb)
4089 if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4090 CERROR("Unregistering cancel cb %p, while only %p was "
4092 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4096 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4100 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4102 struct lustre_cfg *lcfg = buf;
4103 struct lprocfs_static_vars lvars = { 0 };
4106 lprocfs_osc_init_vars(&lvars);
4108 switch (lcfg->lcfg_command) {
4109 case LCFG_SPTLRPC_CONF:
4110 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4113 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4121 struct obd_ops osc_obd_ops = {
4122 .o_owner = THIS_MODULE,
4123 .o_setup = osc_setup,
4124 .o_precleanup = osc_precleanup,
4125 .o_cleanup = osc_cleanup,
4126 .o_add_conn = client_import_add_conn,
4127 .o_del_conn = client_import_del_conn,
4128 .o_connect = client_connect_import,
4129 .o_reconnect = osc_reconnect,
4130 .o_disconnect = osc_disconnect,
4131 .o_statfs = osc_statfs,
4132 .o_statfs_async = osc_statfs_async,
4133 .o_packmd = osc_packmd,
4134 .o_unpackmd = osc_unpackmd,
4135 .o_precreate = osc_precreate,
4136 .o_create = osc_create,
4137 .o_destroy = osc_destroy,
4138 .o_getattr = osc_getattr,
4139 .o_getattr_async = osc_getattr_async,
4140 .o_setattr = osc_setattr,
4141 .o_setattr_async = osc_setattr_async,
4143 .o_brw_async = osc_brw_async,
4144 .o_prep_async_page = osc_prep_async_page,
4145 .o_reget_short_lock = osc_reget_short_lock,
4146 .o_release_short_lock = osc_release_short_lock,
4147 .o_queue_async_io = osc_queue_async_io,
4148 .o_set_async_flags = osc_set_async_flags,
4149 .o_queue_group_io = osc_queue_group_io,
4150 .o_trigger_group_io = osc_trigger_group_io,
4151 .o_teardown_async_page = osc_teardown_async_page,
4152 .o_punch = osc_punch,
4154 .o_enqueue = osc_enqueue,
4155 .o_match = osc_match,
4156 .o_change_cbdata = osc_change_cbdata,
4157 .o_cancel = osc_cancel,
4158 .o_cancel_unused = osc_cancel_unused,
4159 .o_join_lru = osc_join_lru,
4160 .o_iocontrol = osc_iocontrol,
4161 .o_get_info = osc_get_info,
4162 .o_set_info_async = osc_set_info_async,
4163 .o_import_event = osc_import_event,
4164 .o_llog_init = osc_llog_init,
4165 .o_llog_finish = osc_llog_finish,
4166 .o_process_config = osc_process_config,
4167 .o_register_page_removal_cb = osc_register_page_removal_cb,
4168 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4169 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4170 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4172 int __init osc_init(void)
4174 struct lprocfs_static_vars lvars = { 0 };
4178 lprocfs_osc_init_vars(&lvars);
4180 request_module("lquota");
4181 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4182 lquota_init(quota_interface);
4183 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4185 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4186 LUSTRE_OSC_NAME, NULL);
4188 if (quota_interface)
4189 PORTAL_SYMBOL_PUT(osc_quota_interface);
4197 static void /*__exit*/ osc_exit(void)
4199 lquota_exit(quota_interface);
4200 if (quota_interface)
4201 PORTAL_SYMBOL_PUT(osc_quota_interface);
4203 class_unregister_type(LUSTRE_OSC_NAME);
4206 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4207 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4208 MODULE_LICENSE("GPL");
4210 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);