1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_cksum.h>
55 #include <lustre_ha.h>
56 #include <lprocfs_status.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include <lustre_cache.h>
61 #include "osc_internal.h"
63 static quota_interface_t *quota_interface = NULL;
64 extern quota_interface_t osc_quota_interface;
66 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72 struct lov_stripe_md *lsm)
77 lmm_size = sizeof(**lmmp);
82 OBD_FREE(*lmmp, lmm_size);
88 OBD_ALLOC(*lmmp, lmm_size);
94 LASSERT(lsm->lsm_object_id);
95 LASSERT(lsm->lsm_object_gr);
96 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
97 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105 struct lov_mds_md *lmm, int lmm_bytes)
111 if (lmm_bytes < sizeof (*lmm)) {
112 CERROR("lov_mds_md too small: %d, need %d\n",
113 lmm_bytes, (int)sizeof(*lmm));
116 /* XXX LOV_MAGIC etc check? */
118 if (lmm->lmm_object_id == 0) {
119 CERROR("lov_mds_md: zero lmm_object_id\n");
124 lsm_size = lov_stripe_md_size(1);
128 if (*lsmp != NULL && lmm == NULL) {
129 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130 OBD_FREE(*lsmp, lsm_size);
136 OBD_ALLOC(*lsmp, lsm_size);
139 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141 OBD_FREE(*lsmp, lsm_size);
144 loi_init((*lsmp)->lsm_oinfo[0]);
148 /* XXX zero *lsmp? */
149 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
151 LASSERT((*lsmp)->lsm_object_id);
152 LASSERT((*lsmp)->lsm_object_gr);
155 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160 static inline void osc_pack_capa(struct ptlrpc_request *req,
161 struct ost_body *body, void *capa)
163 struct obd_capa *oc = (struct obd_capa *)capa;
164 struct lustre_capa *c;
169 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
172 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
173 DEBUG_CAPA(D_SEC, c, "pack");
176 static inline void osc_pack_req_body(struct ptlrpc_request *req,
177 struct obd_info *oinfo)
179 struct ost_body *body;
181 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
184 body->oa = *oinfo->oi_oa;
185 osc_pack_capa(req, body, oinfo->oi_capa);
188 static inline void osc_set_capa_size(struct ptlrpc_request *req,
189 const struct req_msg_field *field,
193 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
195 /* it is already calculated as sizeof struct obd_capa */
199 static int osc_getattr_interpret(struct ptlrpc_request *req,
200 struct osc_async_args *aa, int rc)
202 struct ost_body *body;
208 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
209 lustre_swab_ost_body);
211 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
212 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
214 /* This should really be sent by the OST */
215 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
216 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
218 CDEBUG(D_INFO, "can't unpack ost_body\n");
220 aa->aa_oi->oi_oa->o_valid = 0;
223 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
227 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
228 struct ptlrpc_request_set *set)
230 struct ptlrpc_request *req;
231 struct osc_async_args *aa;
235 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
239 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
240 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
242 ptlrpc_request_free(req);
246 osc_pack_req_body(req, oinfo);
248 ptlrpc_request_set_replen(req);
249 req->rq_interpret_reply = osc_getattr_interpret;
251 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
252 aa = (struct osc_async_args *)&req->rq_async_args;
255 ptlrpc_set_add_req(set, req);
259 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
261 struct ptlrpc_request *req;
262 struct ost_body *body;
266 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
270 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
271 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
273 ptlrpc_request_free(req);
277 osc_pack_req_body(req, oinfo);
279 ptlrpc_request_set_replen(req);
281 rc = ptlrpc_queue_wait(req);
285 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
287 GOTO(out, rc = -EPROTO);
289 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
290 *oinfo->oi_oa = body->oa;
292 /* This should really be sent by the OST */
293 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
294 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
298 ptlrpc_req_finished(req);
302 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
303 struct obd_trans_info *oti)
305 struct ptlrpc_request *req;
306 struct ost_body *body;
310 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
311 oinfo->oi_oa->o_gr > 0);
313 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
318 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
320 ptlrpc_request_free(req);
324 osc_pack_req_body(req, oinfo);
326 ptlrpc_request_set_replen(req);
329 rc = ptlrpc_queue_wait(req);
333 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
335 GOTO(out, rc = -EPROTO);
337 *oinfo->oi_oa = body->oa;
341 ptlrpc_req_finished(req);
345 static int osc_setattr_interpret(struct ptlrpc_request *req,
346 struct osc_async_args *aa, int rc)
348 struct ost_body *body;
354 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
356 GOTO(out, rc = -EPROTO);
358 *aa->aa_oi->oi_oa = body->oa;
360 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
364 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
365 struct obd_trans_info *oti,
366 struct ptlrpc_request_set *rqset)
368 struct ptlrpc_request *req;
369 struct osc_async_args *aa;
373 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
378 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
380 ptlrpc_request_free(req);
384 osc_pack_req_body(req, oinfo);
386 ptlrpc_request_set_replen(req);
388 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
390 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
393 /* do mds to ost setattr asynchronouly */
395 /* Do not wait for response. */
396 ptlrpcd_add_req(req);
398 req->rq_interpret_reply = osc_setattr_interpret;
400 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
401 aa = (struct osc_async_args *)&req->rq_async_args;
404 ptlrpc_set_add_req(rqset, req);
410 int osc_real_create(struct obd_export *exp, struct obdo *oa,
411 struct lov_stripe_md **ea, struct obd_trans_info *oti)
413 struct ptlrpc_request *req;
414 struct ost_body *body;
415 struct lov_stripe_md *lsm;
424 rc = obd_alloc_memmd(exp, &lsm);
429 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
431 GOTO(out, rc = -ENOMEM);
433 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
435 ptlrpc_request_free(req);
439 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
443 ptlrpc_request_set_replen(req);
445 if (oa->o_valid & OBD_MD_FLINLINE) {
446 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
447 oa->o_flags == OBD_FL_DELORPHAN);
449 "delorphan from OST integration");
450 /* Don't resend the delorphan req */
451 req->rq_no_resend = req->rq_no_delay = 1;
454 rc = ptlrpc_queue_wait(req);
458 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
460 GOTO(out_req, rc = -EPROTO);
464 /* This should really be sent by the OST */
465 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
466 oa->o_valid |= OBD_MD_FLBLKSZ;
468 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
469 * have valid lsm_oinfo data structs, so don't go touching that.
470 * This needs to be fixed in a big way.
472 lsm->lsm_object_id = oa->o_id;
473 lsm->lsm_object_gr = oa->o_gr;
477 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
479 if (oa->o_valid & OBD_MD_FLCOOKIE) {
480 if (!oti->oti_logcookies)
481 oti_alloc_cookies(oti, 1);
482 *oti->oti_logcookies = *obdo_logcookie(oa);
486 CDEBUG(D_HA, "transno: "LPD64"\n",
487 lustre_msg_get_transno(req->rq_repmsg));
489 ptlrpc_req_finished(req);
492 obd_free_memmd(exp, &lsm);
496 static int osc_punch_interpret(struct ptlrpc_request *req,
497 struct osc_async_args *aa, int rc)
499 struct ost_body *body;
505 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
507 GOTO(out, rc = -EPROTO);
509 *aa->aa_oi->oi_oa = body->oa;
511 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
515 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
516 struct obd_trans_info *oti,
517 struct ptlrpc_request_set *rqset)
519 struct ptlrpc_request *req;
520 struct osc_async_args *aa;
521 struct ost_body *body;
526 CDEBUG(D_INFO, "oa NULL\n");
530 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
534 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
535 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
537 ptlrpc_request_free(req);
540 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
541 osc_pack_req_body(req, oinfo);
543 /* overload the size and blocks fields in the oa with start/end */
544 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546 body->oa.o_size = oinfo->oi_policy.l_extent.start;
547 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
548 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
549 ptlrpc_request_set_replen(req);
552 req->rq_interpret_reply = osc_punch_interpret;
553 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
554 aa = (struct osc_async_args *)&req->rq_async_args;
556 ptlrpc_set_add_req(rqset, req);
561 static int osc_sync(struct obd_export *exp, struct obdo *oa,
562 struct lov_stripe_md *md, obd_size start, obd_size end,
565 struct ptlrpc_request *req;
566 struct ost_body *body;
571 CDEBUG(D_INFO, "oa NULL\n");
575 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
579 osc_set_capa_size(req, &RMF_CAPA1, capa);
580 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
582 ptlrpc_request_free(req);
586 /* overload the size and blocks fields in the oa with start/end */
587 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
590 body->oa.o_size = start;
591 body->oa.o_blocks = end;
592 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
593 osc_pack_capa(req, body, capa);
595 ptlrpc_request_set_replen(req);
597 rc = ptlrpc_queue_wait(req);
601 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
603 GOTO(out, rc = -EPROTO);
609 ptlrpc_req_finished(req);
613 /* Find and cancel locally locks matched by @mode in the resource found by
614 * @objid. Found locks are added into @cancel list. Returns the amount of
615 * locks added to @cancels list. */
616 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
617 struct list_head *cancels, ldlm_mode_t mode,
620 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
621 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
622 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
629 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
630 lock_flags, 0, NULL);
631 ldlm_resource_putref(res);
635 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
638 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
640 atomic_dec(&cli->cl_destroy_in_flight);
641 cfs_waitq_signal(&cli->cl_destroy_waitq);
645 static int osc_can_send_destroy(struct client_obd *cli)
647 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
648 cli->cl_max_rpcs_in_flight) {
649 /* The destroy request can be sent */
652 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
653 cli->cl_max_rpcs_in_flight) {
655 * The counter has been modified between the two atomic
658 cfs_waitq_signal(&cli->cl_destroy_waitq);
663 /* Destroy requests can be async always on the client, and we don't even really
664 * care about the return code since the client cannot do anything at all about
666 * When the MDS is unlinking a filename, it saves the file objects into a
667 * recovery llog, and these object records are cancelled when the OST reports
668 * they were destroyed and sync'd to disk (i.e. transaction committed).
669 * If the client dies, or the OST is down when the object should be destroyed,
670 * the records are not cancelled, and when the OST reconnects to the MDS next,
671 * it will retrieve the llog unlink logs and then sends the log cancellation
672 * cookies to the MDS after committing destroy transactions. */
673 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
674 struct lov_stripe_md *ea, struct obd_trans_info *oti,
675 struct obd_export *md_export)
677 struct client_obd *cli = &exp->exp_obd->u.cli;
678 struct ptlrpc_request *req;
679 struct ost_body *body;
680 CFS_LIST_HEAD(cancels);
685 CDEBUG(D_INFO, "oa NULL\n");
689 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
690 LDLM_FL_DISCARD_DATA);
692 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
694 ldlm_lock_list_put(&cancels, l_bl_ast, count);
698 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
701 ptlrpc_request_free(req);
705 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
706 req->rq_interpret_reply = osc_destroy_interpret;
708 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
709 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
710 sizeof(*oti->oti_logcookies));
711 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
715 ptlrpc_request_set_replen(req);
717 if (!osc_can_send_destroy(cli)) {
718 struct l_wait_info lwi = { 0 };
721 * Wait until the number of on-going destroy RPCs drops
722 * under max_rpc_in_flight
724 l_wait_event_exclusive(cli->cl_destroy_waitq,
725 osc_can_send_destroy(cli), &lwi);
728 /* Do not wait for response */
729 ptlrpcd_add_req(req);
733 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
736 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
738 LASSERT(!(oa->o_valid & bits));
741 client_obd_list_lock(&cli->cl_loi_list_lock);
742 oa->o_dirty = cli->cl_dirty;
743 if (cli->cl_dirty > cli->cl_dirty_max) {
744 CERROR("dirty %lu > dirty_max %lu\n",
745 cli->cl_dirty, cli->cl_dirty_max);
747 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
748 CERROR("dirty %d > system dirty_max %d\n",
749 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
751 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
752 CERROR("dirty %lu - dirty_max %lu too big???\n",
753 cli->cl_dirty, cli->cl_dirty_max);
756 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
757 (cli->cl_max_rpcs_in_flight + 1);
758 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
760 oa->o_grant = cli->cl_avail_grant;
761 oa->o_dropped = cli->cl_lost_grant;
762 cli->cl_lost_grant = 0;
763 client_obd_list_unlock(&cli->cl_loi_list_lock);
764 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
765 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
768 /* caller must hold loi_list_lock */
769 static void osc_consume_write_grant(struct client_obd *cli,
770 struct brw_page *pga)
772 atomic_inc(&obd_dirty_pages);
773 cli->cl_dirty += CFS_PAGE_SIZE;
774 cli->cl_avail_grant -= CFS_PAGE_SIZE;
775 pga->flag |= OBD_BRW_FROM_GRANT;
776 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
777 CFS_PAGE_SIZE, pga, pga->pg);
778 LASSERT(cli->cl_avail_grant >= 0);
781 /* the companion to osc_consume_write_grant, called when a brw has completed.
782 * must be called with the loi lock held. */
783 static void osc_release_write_grant(struct client_obd *cli,
784 struct brw_page *pga, int sent)
786 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
789 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
794 pga->flag &= ~OBD_BRW_FROM_GRANT;
795 atomic_dec(&obd_dirty_pages);
796 cli->cl_dirty -= CFS_PAGE_SIZE;
798 cli->cl_lost_grant += CFS_PAGE_SIZE;
799 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
800 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
801 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
802 /* For short writes we shouldn't count parts of pages that
803 * span a whole block on the OST side, or our accounting goes
804 * wrong. Should match the code in filter_grant_check. */
805 int offset = pga->off & ~CFS_PAGE_MASK;
806 int count = pga->count + (offset & (blocksize - 1));
807 int end = (offset + pga->count) & (blocksize - 1);
809 count += blocksize - end;
811 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
812 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
813 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
814 cli->cl_avail_grant, cli->cl_dirty);
820 static unsigned long rpcs_in_flight(struct client_obd *cli)
822 return cli->cl_r_in_flight + cli->cl_w_in_flight;
825 /* caller must hold loi_list_lock */
826 void osc_wake_cache_waiters(struct client_obd *cli)
828 struct list_head *l, *tmp;
829 struct osc_cache_waiter *ocw;
832 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
833 /* if we can't dirty more, we must wait until some is written */
834 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
835 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
836 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
837 "osc max %ld, sys max %d\n", cli->cl_dirty,
838 cli->cl_dirty_max, obd_max_dirty_pages);
842 /* if still dirty cache but no grant wait for pending RPCs that
843 * may yet return us some grant before doing sync writes */
844 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
845 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
846 cli->cl_w_in_flight);
850 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
851 list_del_init(&ocw->ocw_entry);
852 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
853 /* no more RPCs in flight to return grant, do sync IO */
854 ocw->ocw_rc = -EDQUOT;
855 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
857 osc_consume_write_grant(cli,
858 &ocw->ocw_oap->oap_brw_page);
861 cfs_waitq_signal(&ocw->ocw_waitq);
867 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
869 client_obd_list_lock(&cli->cl_loi_list_lock);
870 cli->cl_avail_grant = ocd->ocd_grant;
871 client_obd_list_unlock(&cli->cl_loi_list_lock);
873 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
874 cli->cl_avail_grant, cli->cl_lost_grant);
875 LASSERT(cli->cl_avail_grant >= 0);
878 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
880 client_obd_list_lock(&cli->cl_loi_list_lock);
881 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
882 if (body->oa.o_valid & OBD_MD_FLGRANT)
883 cli->cl_avail_grant += body->oa.o_grant;
884 /* waiters are woken in brw_interpret */
885 client_obd_list_unlock(&cli->cl_loi_list_lock);
888 /* We assume that the reason this OSC got a short read is because it read
889 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
890 * via the LOV, and it _knows_ it's reading inside the file, it's just that
891 * this stripe never got written at or beyond this stripe offset yet. */
892 static void handle_short_read(int nob_read, obd_count page_count,
893 struct brw_page **pga)
898 /* skip bytes read OK */
899 while (nob_read > 0) {
900 LASSERT (page_count > 0);
902 if (pga[i]->count > nob_read) {
903 /* EOF inside this page */
904 ptr = cfs_kmap(pga[i]->pg) +
905 (pga[i]->off & ~CFS_PAGE_MASK);
906 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
907 cfs_kunmap(pga[i]->pg);
913 nob_read -= pga[i]->count;
918 /* zero remaining pages */
919 while (page_count-- > 0) {
920 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
921 memset(ptr, 0, pga[i]->count);
922 cfs_kunmap(pga[i]->pg);
927 static int check_write_rcs(struct ptlrpc_request *req,
928 int requested_nob, int niocount,
929 obd_count page_count, struct brw_page **pga)
933 /* return error if any niobuf was in error */
934 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
935 sizeof(*remote_rcs) * niocount, NULL);
936 if (remote_rcs == NULL) {
937 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
940 if (lustre_msg_swabbed(req->rq_repmsg))
941 for (i = 0; i < niocount; i++)
942 __swab32s(&remote_rcs[i]);
944 for (i = 0; i < niocount; i++) {
945 if (remote_rcs[i] < 0)
946 return(remote_rcs[i]);
948 if (remote_rcs[i] != 0) {
949 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
950 i, remote_rcs[i], req);
955 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
956 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
957 requested_nob, req->rq_bulk->bd_nob_transferred);
964 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
966 if (p1->flag != p2->flag) {
967 unsigned mask = ~OBD_BRW_FROM_GRANT;
969 /* warn if we try to combine flags that we don't know to be
971 if ((p1->flag & mask) != (p2->flag & mask))
972 CERROR("is it ok to have flags 0x%x and 0x%x in the "
973 "same brw?\n", p1->flag, p2->flag);
977 return (p1->off + p1->count == p2->off);
980 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
981 struct brw_page **pga, int opc,
982 cksum_type_t cksum_type)
987 LASSERT (pg_count > 0);
988 cksum = init_checksum(cksum_type);
989 while (nob > 0 && pg_count > 0) {
990 unsigned char *ptr = cfs_kmap(pga[i]->pg);
991 int off = pga[i]->off & ~CFS_PAGE_MASK;
992 int count = pga[i]->count > nob ? nob : pga[i]->count;
994 /* corrupt the data before we compute the checksum, to
995 * simulate an OST->client data error */
996 if (i == 0 && opc == OST_READ &&
997 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
998 memcpy(ptr + off, "bad1", min(4, nob));
999 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1000 cfs_kunmap(pga[i]->pg);
1001 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1004 nob -= pga[i]->count;
1008 /* For sending we only compute the wrong checksum instead
1009 * of corrupting the data so it is still correct on a redo */
1010 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1016 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1017 struct lov_stripe_md *lsm, obd_count page_count,
1018 struct brw_page **pga,
1019 struct ptlrpc_request **reqp,
1020 struct obd_capa *ocapa)
1022 struct ptlrpc_request *req;
1023 struct ptlrpc_bulk_desc *desc;
1024 struct ost_body *body;
1025 struct obd_ioobj *ioobj;
1026 struct niobuf_remote *niobuf;
1027 int niocount, i, requested_nob, opc, rc;
1028 struct osc_brw_async_args *aa;
1029 struct req_capsule *pill;
1032 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1033 RETURN(-ENOMEM); /* Recoverable */
1034 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1035 RETURN(-EINVAL); /* Fatal */
1037 if ((cmd & OBD_BRW_WRITE) != 0) {
1039 req = ptlrpc_request_alloc_pool(cli->cl_import,
1040 cli->cl_import->imp_rq_pool,
1044 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1050 for (niocount = i = 1; i < page_count; i++) {
1051 if (!can_merge_pages(pga[i - 1], pga[i]))
1055 pill = &req->rq_pill;
1056 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1057 niocount * sizeof(*niobuf));
1058 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1060 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1062 ptlrpc_request_free(req);
1065 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1067 if (opc == OST_WRITE)
1068 desc = ptlrpc_prep_bulk_imp(req, page_count,
1069 BULK_GET_SOURCE, OST_BULK_PORTAL);
1071 desc = ptlrpc_prep_bulk_imp(req, page_count,
1072 BULK_PUT_SINK, OST_BULK_PORTAL);
1075 GOTO(out, rc = -ENOMEM);
1076 /* NB request now owns desc and will free it when it gets freed */
1078 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1079 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1080 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1081 LASSERT(body && ioobj && niobuf);
1085 obdo_to_ioobj(oa, ioobj);
1086 ioobj->ioo_bufcnt = niocount;
1087 osc_pack_capa(req, body, ocapa);
1088 LASSERT (page_count > 0);
1089 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1090 struct brw_page *pg = pga[i];
1091 struct brw_page *pg_prev = pga[i - 1];
1093 LASSERT(pg->count > 0);
1094 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1095 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1096 pg->off, pg->count);
1098 LASSERTF(i == 0 || pg->off > pg_prev->off,
1099 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1100 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1102 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1103 pg_prev->pg, page_private(pg_prev->pg),
1104 pg_prev->pg->index, pg_prev->off);
1106 LASSERTF(i == 0 || pg->off > pg_prev->off,
1107 "i %d p_c %u\n", i, page_count);
1109 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1110 (pg->flag & OBD_BRW_SRVLOCK));
1112 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1114 requested_nob += pg->count;
1116 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1118 niobuf->len += pg->count;
1120 niobuf->offset = pg->off;
1121 niobuf->len = pg->count;
1122 niobuf->flags = pg->flag;
1126 LASSERT((void *)(niobuf - niocount) ==
1127 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1128 niocount * sizeof(*niobuf)));
1129 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1131 /* size[REQ_REC_OFF] still sizeof (*body) */
1132 if (opc == OST_WRITE) {
1133 if (unlikely(cli->cl_checksum) &&
1134 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1135 /* store cl_cksum_type in a local variable since
1136 * it can be changed via lprocfs */
1137 cksum_type_t cksum_type = cli->cl_cksum_type;
1139 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1140 oa->o_flags = body->oa.o_flags = 0;
1141 body->oa.o_flags |= cksum_type_pack(cksum_type);
1142 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1143 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1147 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1149 /* save this in 'oa', too, for later checking */
1150 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1151 oa->o_flags |= cksum_type_pack(cksum_type);
1153 /* clear out the checksum flag, in case this is a
1154 * resend but cl_checksum is no longer set. b=11238 */
1155 oa->o_valid &= ~OBD_MD_FLCKSUM;
1157 oa->o_cksum = body->oa.o_cksum;
1158 /* 1 RC per niobuf */
1159 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1160 sizeof(__u32) * niocount);
1162 if (unlikely(cli->cl_checksum) &&
1163 req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1164 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1165 body->oa.o_flags = 0;
1166 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1167 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1169 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1170 /* 1 RC for the whole I/O */
1172 ptlrpc_request_set_replen(req);
1174 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1175 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1177 aa->aa_requested_nob = requested_nob;
1178 aa->aa_nio_count = niocount;
1179 aa->aa_page_count = page_count;
1183 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1189 ptlrpc_req_finished(req);
1193 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1194 __u32 client_cksum, __u32 server_cksum, int nob,
1195 obd_count page_count, struct brw_page **pga,
1196 cksum_type_t client_cksum_type)
1200 cksum_type_t cksum_type;
1202 if (server_cksum == client_cksum) {
1203 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1207 if (oa->o_valid & OBD_MD_FLFLAGS)
1208 cksum_type = cksum_type_unpack(oa->o_flags);
1210 cksum_type = OBD_CKSUM_CRC32;
1212 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1215 if (cksum_type != client_cksum_type)
1216 msg = "the server did not use the checksum type specified in "
1217 "the original request - likely a protocol problem";
1218 else if (new_cksum == server_cksum)
1219 msg = "changed on the client after we checksummed it - "
1220 "likely false positive due to mmap IO (bug 11742)";
1221 else if (new_cksum == client_cksum)
1222 msg = "changed in transit before arrival at OST";
1224 msg = "changed in transit AND doesn't match the original - "
1225 "likely false positive due to mmap IO (bug 11742)";
1227 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1228 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1229 "["LPU64"-"LPU64"]\n",
1230 msg, libcfs_nid2str(peer->nid),
1231 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1232 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1235 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1237 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1238 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1239 "client csum now %x\n", client_cksum, client_cksum_type,
1240 server_cksum, cksum_type, new_cksum);
1244 /* Note rc enters this function as number of bytes transferred */
1245 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1247 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1248 const lnet_process_id_t *peer =
1249 &req->rq_import->imp_connection->c_peer;
1250 struct client_obd *cli = aa->aa_cli;
1251 struct ost_body *body;
1252 __u32 client_cksum = 0;
1255 if (rc < 0 && rc != -EDQUOT)
1258 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1259 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1260 lustre_swab_ost_body);
1262 CDEBUG(D_INFO, "Can't unpack body\n");
1266 /* set/clear over quota flag for a uid/gid */
1267 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1268 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1269 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1270 body->oa.o_gid, body->oa.o_valid,
1276 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1277 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1279 osc_update_grant(cli, body);
1281 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1283 CERROR("Unexpected +ve rc %d\n", rc);
1286 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1288 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1289 check_write_checksum(&body->oa, peer, client_cksum,
1290 body->oa.o_cksum, aa->aa_requested_nob,
1291 aa->aa_page_count, aa->aa_ppga,
1292 cksum_type_unpack(aa->aa_oa->o_flags)))
1295 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1298 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1299 aa->aa_page_count, aa->aa_ppga);
1303 /* The rest of this function executes only for OST_READs */
1304 if (rc > aa->aa_requested_nob) {
1305 CERROR("Unexpected rc %d (%d requested)\n", rc,
1306 aa->aa_requested_nob);
1310 if (rc != req->rq_bulk->bd_nob_transferred) {
1311 CERROR ("Unexpected rc %d (%d transferred)\n",
1312 rc, req->rq_bulk->bd_nob_transferred);
1316 if (rc < aa->aa_requested_nob)
1317 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1319 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1321 GOTO(out, rc = -EAGAIN);
1323 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1324 static int cksum_counter;
1325 __u32 server_cksum = body->oa.o_cksum;
1328 cksum_type_t cksum_type;
1330 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1331 cksum_type = cksum_type_unpack(body->oa.o_flags);
1333 cksum_type = OBD_CKSUM_CRC32;
1334 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1335 aa->aa_ppga, OST_READ,
1338 if (peer->nid == req->rq_bulk->bd_sender) {
1342 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1345 if (server_cksum == ~0 && rc > 0) {
1346 CERROR("Protocol error: server %s set the 'checksum' "
1347 "bit, but didn't send a checksum. Not fatal, "
1348 "but please tell CFS.\n",
1349 libcfs_nid2str(peer->nid));
1350 } else if (server_cksum != client_cksum) {
1351 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1352 "%s%s%s inum "LPU64"/"LPU64" object "
1353 LPU64"/"LPU64" extent "
1354 "["LPU64"-"LPU64"]\n",
1355 req->rq_import->imp_obd->obd_name,
1356 libcfs_nid2str(peer->nid),
1358 body->oa.o_valid & OBD_MD_FLFID ?
1359 body->oa.o_fid : (__u64)0,
1360 body->oa.o_valid & OBD_MD_FLFID ?
1361 body->oa.o_generation :(__u64)0,
1363 body->oa.o_valid & OBD_MD_FLGROUP ?
1364 body->oa.o_gr : (__u64)0,
1365 aa->aa_ppga[0]->off,
1366 aa->aa_ppga[aa->aa_page_count-1]->off +
1367 aa->aa_ppga[aa->aa_page_count-1]->count -
1369 CERROR("client %x, server %x, cksum_type %x\n",
1370 client_cksum, server_cksum, cksum_type);
1372 aa->aa_oa->o_cksum = client_cksum;
1376 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1379 } else if (unlikely(client_cksum)) {
1380 static int cksum_missed;
1383 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1384 CERROR("Checksum %u requested from %s but not sent\n",
1385 cksum_missed, libcfs_nid2str(peer->nid));
1391 *aa->aa_oa = body->oa;
1396 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1397 struct lov_stripe_md *lsm,
1398 obd_count page_count, struct brw_page **pga,
1399 struct obd_capa *ocapa)
1401 struct ptlrpc_request *req;
1405 struct l_wait_info lwi;
1409 cfs_waitq_init(&waitq);
1412 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1413 page_count, pga, &req, ocapa);
1417 rc = ptlrpc_queue_wait(req);
1419 if (rc == -ETIMEDOUT && req->rq_resend) {
1420 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1421 ptlrpc_req_finished(req);
1425 rc = osc_brw_fini_request(req, rc);
1427 ptlrpc_req_finished(req);
1428 if (osc_recoverable_error(rc)) {
1430 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1431 CERROR("too many resend retries, returning error\n");
1435 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1436 l_wait_event(waitq, 0, &lwi);
1444 int osc_brw_redo_request(struct ptlrpc_request *request,
1445 struct osc_brw_async_args *aa)
1447 struct ptlrpc_request *new_req;
1448 struct ptlrpc_request_set *set = request->rq_set;
1449 struct osc_brw_async_args *new_aa;
1450 struct osc_async_page *oap;
1454 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1455 CERROR("too many resend retries, returning error\n");
1459 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1461 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1462 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1463 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1466 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1467 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1468 aa->aa_cli, aa->aa_oa,
1469 NULL /* lsm unused by osc currently */,
1470 aa->aa_page_count, aa->aa_ppga,
1471 &new_req, NULL /* ocapa */);
1475 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1477 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1478 if (oap->oap_request != NULL) {
1479 LASSERTF(request == oap->oap_request,
1480 "request %p != oap_request %p\n",
1481 request, oap->oap_request);
1482 if (oap->oap_interrupted) {
1483 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1484 ptlrpc_req_finished(new_req);
1489 /* New request takes over pga and oaps from old request.
1490 * Note that copying a list_head doesn't work, need to move it... */
1492 new_req->rq_interpret_reply = request->rq_interpret_reply;
1493 new_req->rq_async_args = request->rq_async_args;
1494 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1496 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1498 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1499 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1500 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1502 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1503 if (oap->oap_request) {
1504 ptlrpc_req_finished(oap->oap_request);
1505 oap->oap_request = ptlrpc_request_addref(new_req);
1509 /* use ptlrpc_set_add_req is safe because interpret functions work
1510 * in check_set context. only one way exist with access to request
1511 * from different thread got -EINTR - this way protected with
1512 * cl_loi_list_lock */
1513 ptlrpc_set_add_req(set, new_req);
1515 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1517 DEBUG_REQ(D_INFO, new_req, "new request");
1521 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1522 struct lov_stripe_md *lsm, obd_count page_count,
1523 struct brw_page **pga, struct ptlrpc_request_set *set,
1524 struct obd_capa *ocapa)
1526 struct ptlrpc_request *req;
1527 struct client_obd *cli = &exp->exp_obd->u.cli;
1529 struct osc_brw_async_args *aa;
1532 /* Consume write credits even if doing a sync write -
1533 * otherwise we may run out of space on OST due to grant. */
1534 if (cmd == OBD_BRW_WRITE) {
1535 spin_lock(&cli->cl_loi_list_lock);
1536 for (i = 0; i < page_count; i++) {
1537 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1538 osc_consume_write_grant(cli, pga[i]);
1540 spin_unlock(&cli->cl_loi_list_lock);
1543 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1546 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1547 if (cmd == OBD_BRW_READ) {
1548 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1549 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1550 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1552 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1553 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1554 cli->cl_w_in_flight);
1555 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1558 LASSERT(list_empty(&aa->aa_oaps));
1560 req->rq_interpret_reply = brw_interpret;
1561 ptlrpc_set_add_req(set, req);
1562 client_obd_list_lock(&cli->cl_loi_list_lock);
1563 if (cmd == OBD_BRW_READ)
1564 cli->cl_r_in_flight++;
1566 cli->cl_w_in_flight++;
1567 client_obd_list_unlock(&cli->cl_loi_list_lock);
1568 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1569 } else if (cmd == OBD_BRW_WRITE) {
1570 client_obd_list_lock(&cli->cl_loi_list_lock);
1571 for (i = 0; i < page_count; i++)
1572 osc_release_write_grant(cli, pga[i], 0);
1573 osc_wake_cache_waiters(cli);
1574 client_obd_list_unlock(&cli->cl_loi_list_lock);
1580 * ugh, we want disk allocation on the target to happen in offset order. we'll
1581 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1582 * fine for our small page arrays and doesn't require allocation. its an
1583 * insertion sort that swaps elements that are strides apart, shrinking the
1584 * stride down until its '1' and the array is sorted.
1586 static void sort_brw_pages(struct brw_page **array, int num)
1589 struct brw_page *tmp;
1593 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1598 for (i = stride ; i < num ; i++) {
1601 while (j >= stride && array[j - stride]->off > tmp->off) {
1602 array[j] = array[j - stride];
1607 } while (stride > 1);
1610 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1616 LASSERT (pages > 0);
1617 offset = pg[i]->off & ~CFS_PAGE_MASK;
1621 if (pages == 0) /* that's all */
1624 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1625 return count; /* doesn't end on page boundary */
1628 offset = pg[i]->off & ~CFS_PAGE_MASK;
1629 if (offset != 0) /* doesn't start on page boundary */
1636 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1638 struct brw_page **ppga;
1641 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1645 for (i = 0; i < count; i++)
1650 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1652 LASSERT(ppga != NULL);
1653 OBD_FREE(ppga, sizeof(*ppga) * count);
1656 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1657 obd_count page_count, struct brw_page *pga,
1658 struct obd_trans_info *oti)
1660 struct obdo *saved_oa = NULL;
1661 struct brw_page **ppga, **orig;
1662 struct obd_import *imp = class_exp2cliimp(exp);
1663 struct client_obd *cli = &imp->imp_obd->u.cli;
1664 int rc, page_count_orig;
1667 if (cmd & OBD_BRW_CHECK) {
1668 /* The caller just wants to know if there's a chance that this
1669 * I/O can succeed */
1671 if (imp == NULL || imp->imp_invalid)
1676 /* test_brw with a failed create can trip this, maybe others. */
1677 LASSERT(cli->cl_max_pages_per_rpc);
1681 orig = ppga = osc_build_ppga(pga, page_count);
1684 page_count_orig = page_count;
1686 sort_brw_pages(ppga, page_count);
1687 while (page_count) {
1688 obd_count pages_per_brw;
1690 if (page_count > cli->cl_max_pages_per_rpc)
1691 pages_per_brw = cli->cl_max_pages_per_rpc;
1693 pages_per_brw = page_count;
1695 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1697 if (saved_oa != NULL) {
1698 /* restore previously saved oa */
1699 *oinfo->oi_oa = *saved_oa;
1700 } else if (page_count > pages_per_brw) {
1701 /* save a copy of oa (brw will clobber it) */
1702 OBDO_ALLOC(saved_oa);
1703 if (saved_oa == NULL)
1704 GOTO(out, rc = -ENOMEM);
1705 *saved_oa = *oinfo->oi_oa;
1708 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1709 pages_per_brw, ppga, oinfo->oi_capa);
1714 page_count -= pages_per_brw;
1715 ppga += pages_per_brw;
1719 osc_release_ppga(orig, page_count_orig);
1721 if (saved_oa != NULL)
1722 OBDO_FREE(saved_oa);
1727 static int osc_brw_async(int cmd, struct obd_export *exp,
1728 struct obd_info *oinfo, obd_count page_count,
1729 struct brw_page *pga, struct obd_trans_info *oti,
1730 struct ptlrpc_request_set *set)
1732 struct brw_page **ppga, **orig;
1733 struct client_obd *cli = &exp->exp_obd->u.cli;
1734 int page_count_orig;
1738 if (cmd & OBD_BRW_CHECK) {
1739 struct obd_import *imp = class_exp2cliimp(exp);
1740 /* The caller just wants to know if there's a chance that this
1741 * I/O can succeed */
1743 if (imp == NULL || imp->imp_invalid)
1748 orig = ppga = osc_build_ppga(pga, page_count);
1751 page_count_orig = page_count;
1753 sort_brw_pages(ppga, page_count);
1754 while (page_count) {
1755 struct brw_page **copy;
1756 obd_count pages_per_brw;
1758 pages_per_brw = min_t(obd_count, page_count,
1759 cli->cl_max_pages_per_rpc);
1761 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1763 /* use ppga only if single RPC is going to fly */
1764 if (pages_per_brw != page_count_orig || ppga != orig) {
1765 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1767 GOTO(out, rc = -ENOMEM);
1768 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1772 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1773 pages_per_brw, copy, set, oinfo->oi_capa);
1777 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1781 /* we passed it to async_internal() which is
1782 * now responsible for releasing memory */
1786 page_count -= pages_per_brw;
1787 ppga += pages_per_brw;
1791 osc_release_ppga(orig, page_count_orig);
1795 static void osc_check_rpcs(struct client_obd *cli);
1797 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1798 * the dirty accounting. Writeback completes or truncate happens before
1799 * writing starts. Must be called with the loi lock held. */
1800 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1803 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1807 /* This maintains the lists of pending pages to read/write for a given object
1808 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1809 * to quickly find objects that are ready to send an RPC. */
1810 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1816 if (lop->lop_num_pending == 0)
1819 /* if we have an invalid import we want to drain the queued pages
1820 * by forcing them through rpcs that immediately fail and complete
1821 * the pages. recovery relies on this to empty the queued pages
1822 * before canceling the locks and evicting down the llite pages */
1823 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1826 /* stream rpcs in queue order as long as as there is an urgent page
1827 * queued. this is our cheap solution for good batching in the case
1828 * where writepage marks some random page in the middle of the file
1829 * as urgent because of, say, memory pressure */
1830 if (!list_empty(&lop->lop_urgent)) {
1831 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1834 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1835 optimal = cli->cl_max_pages_per_rpc;
1836 if (cmd & OBD_BRW_WRITE) {
1837 /* trigger a write rpc stream as long as there are dirtiers
1838 * waiting for space. as they're waiting, they're not going to
1839 * create more pages to coallesce with what's waiting.. */
1840 if (!list_empty(&cli->cl_cache_waiters)) {
1841 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1844 /* +16 to avoid triggering rpcs that would want to include pages
1845 * that are being queued but which can't be made ready until
1846 * the queuer finishes with the page. this is a wart for
1847 * llite::commit_write() */
1850 if (lop->lop_num_pending >= optimal)
1856 static void on_list(struct list_head *item, struct list_head *list,
1859 if (list_empty(item) && should_be_on)
1860 list_add_tail(item, list);
1861 else if (!list_empty(item) && !should_be_on)
1862 list_del_init(item);
1865 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1866 * can find pages to build into rpcs quickly */
1867 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1869 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1870 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1871 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1873 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1874 loi->loi_write_lop.lop_num_pending);
1876 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1877 loi->loi_read_lop.lop_num_pending);
1880 static void lop_update_pending(struct client_obd *cli,
1881 struct loi_oap_pages *lop, int cmd, int delta)
1883 lop->lop_num_pending += delta;
1884 if (cmd & OBD_BRW_WRITE)
1885 cli->cl_pending_w_pages += delta;
1887 cli->cl_pending_r_pages += delta;
1890 /* this is called when a sync waiter receives an interruption. Its job is to
1891 * get the caller woken as soon as possible. If its page hasn't been put in an
1892 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1893 * desiring interruption which will forcefully complete the rpc once the rpc
1895 static void osc_occ_interrupted(struct oig_callback_context *occ)
1897 struct osc_async_page *oap;
1898 struct loi_oap_pages *lop;
1899 struct lov_oinfo *loi;
1902 /* XXX member_of() */
1903 oap = list_entry(occ, struct osc_async_page, oap_occ);
1905 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1907 oap->oap_interrupted = 1;
1909 /* ok, it's been put in an rpc. only one oap gets a request reference */
1910 if (oap->oap_request != NULL) {
1911 ptlrpc_mark_interrupted(oap->oap_request);
1912 ptlrpcd_wake(oap->oap_request);
1916 /* we don't get interruption callbacks until osc_trigger_group_io()
1917 * has been called and put the sync oaps in the pending/urgent lists.*/
1918 if (!list_empty(&oap->oap_pending_item)) {
1919 list_del_init(&oap->oap_pending_item);
1920 list_del_init(&oap->oap_urgent_item);
1923 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1924 &loi->loi_write_lop : &loi->loi_read_lop;
1925 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1926 loi_list_maint(oap->oap_cli, oap->oap_loi);
1928 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1929 oap->oap_oig = NULL;
1933 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1936 /* this is trying to propogate async writeback errors back up to the
1937 * application. As an async write fails we record the error code for later if
1938 * the app does an fsync. As long as errors persist we force future rpcs to be
1939 * sync so that the app can get a sync error and break the cycle of queueing
1940 * pages for which writeback will fail. */
1941 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1948 ar->ar_force_sync = 1;
1949 ar->ar_min_xid = ptlrpc_sample_next_xid();
1954 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1955 ar->ar_force_sync = 0;
1958 static void osc_oap_to_pending(struct osc_async_page *oap)
1960 struct loi_oap_pages *lop;
1962 if (oap->oap_cmd & OBD_BRW_WRITE)
1963 lop = &oap->oap_loi->loi_write_lop;
1965 lop = &oap->oap_loi->loi_read_lop;
1967 if (oap->oap_async_flags & ASYNC_URGENT)
1968 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1969 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1970 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1973 /* this must be called holding the loi list lock to give coverage to exit_cache,
1974 * async_flag maintenance, and oap_request */
1975 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1976 struct osc_async_page *oap, int sent, int rc)
1981 if (oap->oap_request != NULL) {
1982 xid = ptlrpc_req_xid(oap->oap_request);
1983 ptlrpc_req_finished(oap->oap_request);
1984 oap->oap_request = NULL;
1987 oap->oap_async_flags = 0;
1988 oap->oap_interrupted = 0;
1990 if (oap->oap_cmd & OBD_BRW_WRITE) {
1991 osc_process_ar(&cli->cl_ar, xid, rc);
1992 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1995 if (rc == 0 && oa != NULL) {
1996 if (oa->o_valid & OBD_MD_FLBLOCKS)
1997 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1998 if (oa->o_valid & OBD_MD_FLMTIME)
1999 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2000 if (oa->o_valid & OBD_MD_FLATIME)
2001 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2002 if (oa->o_valid & OBD_MD_FLCTIME)
2003 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2007 osc_exit_cache(cli, oap, sent);
2008 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2009 oap->oap_oig = NULL;
2014 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2015 oap->oap_cmd, oa, rc);
2017 /* ll_ap_completion (from llite) drops PG_locked. so, a new
2018 * I/O on the page could start, but OSC calls it under lock
2019 * and thus we can add oap back to pending safely */
2021 /* upper layer wants to leave the page on pending queue */
2022 osc_oap_to_pending(oap);
2024 osc_exit_cache(cli, oap, sent);
2028 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
2030 struct osc_brw_async_args *aa = data;
2031 struct client_obd *cli;
2034 rc = osc_brw_fini_request(req, rc);
2035 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2036 if (osc_recoverable_error(rc)) {
2037 rc = osc_brw_redo_request(req, aa);
2044 client_obd_list_lock(&cli->cl_loi_list_lock);
2046 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2047 * is called so we know whether to go to sync BRWs or wait for more
2048 * RPCs to complete */
2049 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2050 cli->cl_w_in_flight--;
2052 cli->cl_r_in_flight--;
2054 if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2055 struct osc_async_page *oap, *tmp;
2056 /* the caller may re-use the oap after the completion call so
2057 * we need to clean it up a little */
2058 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2059 list_del_init(&oap->oap_rpc_item);
2060 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2062 OBDO_FREE(aa->aa_oa);
2063 } else { /* from async_internal() */
2065 for (i = 0; i < aa->aa_page_count; i++)
2066 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2068 osc_wake_cache_waiters(cli);
2069 osc_check_rpcs(cli);
2070 client_obd_list_unlock(&cli->cl_loi_list_lock);
2072 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2076 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2077 struct list_head *rpc_list,
2078 int page_count, int cmd)
2080 struct ptlrpc_request *req;
2081 struct brw_page **pga = NULL;
2082 struct osc_brw_async_args *aa;
2083 struct obdo *oa = NULL;
2084 struct obd_async_page_ops *ops = NULL;
2085 void *caller_data = NULL;
2086 struct obd_capa *ocapa;
2087 struct osc_async_page *oap;
2091 LASSERT(!list_empty(rpc_list));
2093 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2095 RETURN(ERR_PTR(-ENOMEM));
2099 GOTO(out, req = ERR_PTR(-ENOMEM));
2102 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2104 ops = oap->oap_caller_ops;
2105 caller_data = oap->oap_caller_data;
2107 pga[i] = &oap->oap_brw_page;
2108 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2109 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2110 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2114 /* always get the data for the obdo for the rpc */
2115 LASSERT(ops != NULL);
2116 ops->ap_fill_obdo(caller_data, cmd, oa);
2117 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2119 sort_brw_pages(pga, page_count);
2120 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2124 CERROR("prep_req failed: %d\n", rc);
2125 GOTO(out, req = ERR_PTR(rc));
2128 /* Need to update the timestamps after the request is built in case
2129 * we race with setattr (locally or in queue at OST). If OST gets
2130 * later setattr before earlier BRW (as determined by the request xid),
2131 * the OST will not use BRW timestamps. Sadly, there is no obvious
2132 * way to do this in a single call. bug 10150 */
2133 ops->ap_update_obdo(caller_data, cmd, oa,
2134 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2136 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2137 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2138 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2139 list_splice(rpc_list, &aa->aa_oaps);
2140 CFS_INIT_LIST_HEAD(rpc_list);
2147 OBD_FREE(pga, sizeof(*pga) * page_count);
2152 /* the loi lock is held across this function but it's allowed to release
2153 * and reacquire it during its work */
2155 * prepare pages for ASYNC io and put pages in send queue.
2159 * \param cmd - OBD_BRW_* macroses
2160 * \param lop - pending pages
2162 * \return zero if pages successfully add to send queue.
2163 * \return not zere if error occurring.
2165 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2166 int cmd, struct loi_oap_pages *lop)
2168 struct ptlrpc_request *req;
2169 obd_count page_count = 0;
2170 struct osc_async_page *oap = NULL, *tmp;
2171 struct osc_brw_async_args *aa;
2172 struct obd_async_page_ops *ops;
2173 CFS_LIST_HEAD(rpc_list);
2174 unsigned int ending_offset;
2175 unsigned starting_offset = 0;
2179 /* first we find the pages we're allowed to work with */
2180 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2182 ops = oap->oap_caller_ops;
2184 LASSERT(oap->oap_magic == OAP_MAGIC);
2186 if (page_count != 0 &&
2187 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2188 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2189 " oap %p, page %p, srvlock %u\n",
2190 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2193 /* in llite being 'ready' equates to the page being locked
2194 * until completion unlocks it. commit_write submits a page
2195 * as not ready because its unlock will happen unconditionally
2196 * as the call returns. if we race with commit_write giving
2197 * us that page we dont' want to create a hole in the page
2198 * stream, so we stop and leave the rpc to be fired by
2199 * another dirtier or kupdated interval (the not ready page
2200 * will still be on the dirty list). we could call in
2201 * at the end of ll_file_write to process the queue again. */
2202 if (!(oap->oap_async_flags & ASYNC_READY)) {
2203 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2205 CDEBUG(D_INODE, "oap %p page %p returned %d "
2206 "instead of ready\n", oap,
2210 /* llite is telling us that the page is still
2211 * in commit_write and that we should try
2212 * and put it in an rpc again later. we
2213 * break out of the loop so we don't create
2214 * a hole in the sequence of pages in the rpc
2219 /* the io isn't needed.. tell the checks
2220 * below to complete the rpc with EINTR */
2221 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2222 oap->oap_count = -EINTR;
2225 oap->oap_async_flags |= ASYNC_READY;
2228 LASSERTF(0, "oap %p page %p returned %d "
2229 "from make_ready\n", oap,
2237 * Page submitted for IO has to be locked. Either by
2238 * ->ap_make_ready() or by higher layers.
2240 #if defined(__KERNEL__) && defined(__linux__)
2241 if(!(PageLocked(oap->oap_page) &&
2242 (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2243 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2244 oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2248 /* If there is a gap at the start of this page, it can't merge
2249 * with any previous page, so we'll hand the network a
2250 * "fragmented" page array that it can't transfer in 1 RDMA */
2251 if (page_count != 0 && oap->oap_page_off != 0)
2254 /* take the page out of our book-keeping */
2255 list_del_init(&oap->oap_pending_item);
2256 lop_update_pending(cli, lop, cmd, -1);
2257 list_del_init(&oap->oap_urgent_item);
2259 if (page_count == 0)
2260 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2261 (PTLRPC_MAX_BRW_SIZE - 1);
2263 /* ask the caller for the size of the io as the rpc leaves. */
2264 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2266 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2267 if (oap->oap_count <= 0) {
2268 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2270 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2274 /* now put the page back in our accounting */
2275 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2276 if (page_count == 0)
2277 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2278 if (++page_count >= cli->cl_max_pages_per_rpc)
2281 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2282 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2283 * have the same alignment as the initial writes that allocated
2284 * extents on the server. */
2285 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2286 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2287 if (ending_offset == 0)
2290 /* If there is a gap at the end of this page, it can't merge
2291 * with any subsequent pages, so we'll hand the network a
2292 * "fragmented" page array that it can't transfer in 1 RDMA */
2293 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2297 osc_wake_cache_waiters(cli);
2299 if (page_count == 0)
2302 loi_list_maint(cli, loi);
2304 client_obd_list_unlock(&cli->cl_loi_list_lock);
2306 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2308 /* this should happen rarely and is pretty bad, it makes the
2309 * pending list not follow the dirty order */
2310 client_obd_list_lock(&cli->cl_loi_list_lock);
2311 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2312 list_del_init(&oap->oap_rpc_item);
2314 /* queued sync pages can be torn down while the pages
2315 * were between the pending list and the rpc */
2316 if (oap->oap_interrupted) {
2317 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2318 osc_ap_completion(cli, NULL, oap, 0,
2322 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2324 loi_list_maint(cli, loi);
2325 RETURN(PTR_ERR(req));
2328 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2330 if (cmd == OBD_BRW_READ) {
2331 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2332 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2333 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2334 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2335 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2337 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2338 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2339 cli->cl_w_in_flight);
2340 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2341 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2342 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2345 client_obd_list_lock(&cli->cl_loi_list_lock);
2347 if (cmd == OBD_BRW_READ)
2348 cli->cl_r_in_flight++;
2350 cli->cl_w_in_flight++;
2352 /* queued sync pages can be torn down while the pages
2353 * were between the pending list and the rpc */
2355 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2356 /* only one oap gets a request reference */
2359 if (oap->oap_interrupted && !req->rq_intr) {
2360 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2362 ptlrpc_mark_interrupted(req);
2366 tmp->oap_request = ptlrpc_request_addref(req);
2368 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2369 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2371 req->rq_interpret_reply = brw_interpret;
2372 ptlrpcd_add_req(req);
2376 #define LOI_DEBUG(LOI, STR, args...) \
2377 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2378 !list_empty(&(LOI)->loi_cli_item), \
2379 (LOI)->loi_write_lop.lop_num_pending, \
2380 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2381 (LOI)->loi_read_lop.lop_num_pending, \
2382 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2385 /* This is called by osc_check_rpcs() to find which objects have pages that
2386 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2387 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2390 /* first return all objects which we already know to have
2391 * pages ready to be stuffed into rpcs */
2392 if (!list_empty(&cli->cl_loi_ready_list))
2393 RETURN(list_entry(cli->cl_loi_ready_list.next,
2394 struct lov_oinfo, loi_cli_item));
2396 /* then if we have cache waiters, return all objects with queued
2397 * writes. This is especially important when many small files
2398 * have filled up the cache and not been fired into rpcs because
2399 * they don't pass the nr_pending/object threshhold */
2400 if (!list_empty(&cli->cl_cache_waiters) &&
2401 !list_empty(&cli->cl_loi_write_list))
2402 RETURN(list_entry(cli->cl_loi_write_list.next,
2403 struct lov_oinfo, loi_write_item));
2405 /* then return all queued objects when we have an invalid import
2406 * so that they get flushed */
2407 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2408 if (!list_empty(&cli->cl_loi_write_list))
2409 RETURN(list_entry(cli->cl_loi_write_list.next,
2410 struct lov_oinfo, loi_write_item));
2411 if (!list_empty(&cli->cl_loi_read_list))
2412 RETURN(list_entry(cli->cl_loi_read_list.next,
2413 struct lov_oinfo, loi_read_item));
2418 /* called with the loi list lock held */
2419 static void osc_check_rpcs(struct client_obd *cli)
2421 struct lov_oinfo *loi;
2422 int rc = 0, race_counter = 0;
2425 while ((loi = osc_next_loi(cli)) != NULL) {
2426 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2428 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2431 /* attempt some read/write balancing by alternating between
2432 * reads and writes in an object. The makes_rpc checks here
2433 * would be redundant if we were getting read/write work items
2434 * instead of objects. we don't want send_oap_rpc to drain a
2435 * partial read pending queue when we're given this object to
2436 * do io on writes while there are cache waiters */
2437 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2438 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2439 &loi->loi_write_lop);
2447 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2448 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2449 &loi->loi_read_lop);
2458 /* attempt some inter-object balancing by issueing rpcs
2459 * for each object in turn */
2460 if (!list_empty(&loi->loi_cli_item))
2461 list_del_init(&loi->loi_cli_item);
2462 if (!list_empty(&loi->loi_write_item))
2463 list_del_init(&loi->loi_write_item);
2464 if (!list_empty(&loi->loi_read_item))
2465 list_del_init(&loi->loi_read_item);
2467 loi_list_maint(cli, loi);
2469 /* send_oap_rpc fails with 0 when make_ready tells it to
2470 * back off. llite's make_ready does this when it tries
2471 * to lock a page queued for write that is already locked.
2472 * we want to try sending rpcs from many objects, but we
2473 * don't want to spin failing with 0. */
2474 if (race_counter == 10)
2480 /* we're trying to queue a page in the osc so we're subject to the
2481 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2482 * If the osc's queued pages are already at that limit, then we want to sleep
2483 * until there is space in the osc's queue for us. We also may be waiting for
2484 * write credits from the OST if there are RPCs in flight that may return some
2485 * before we fall back to sync writes.
2487 * We need this know our allocation was granted in the presence of signals */
2488 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2492 client_obd_list_lock(&cli->cl_loi_list_lock);
2493 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2494 client_obd_list_unlock(&cli->cl_loi_list_lock);
2498 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2499 * grant or cache space. */
2500 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2501 struct osc_async_page *oap)
2503 struct osc_cache_waiter ocw;
2504 struct l_wait_info lwi = { 0 };
2508 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2509 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2510 cli->cl_dirty_max, obd_max_dirty_pages,
2511 cli->cl_lost_grant, cli->cl_avail_grant);
2513 /* force the caller to try sync io. this can jump the list
2514 * of queued writes and create a discontiguous rpc stream */
2515 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2516 loi->loi_ar.ar_force_sync)
2519 /* Hopefully normal case - cache space and write credits available */
2520 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2521 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2522 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2523 /* account for ourselves */
2524 osc_consume_write_grant(cli, &oap->oap_brw_page);
2528 /* Make sure that there are write rpcs in flight to wait for. This
2529 * is a little silly as this object may not have any pending but
2530 * other objects sure might. */
2531 if (cli->cl_w_in_flight) {
2532 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2533 cfs_waitq_init(&ocw.ocw_waitq);
2537 loi_list_maint(cli, loi);
2538 osc_check_rpcs(cli);
2539 client_obd_list_unlock(&cli->cl_loi_list_lock);
2541 CDEBUG(D_CACHE, "sleeping for cache space\n");
2542 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2544 client_obd_list_lock(&cli->cl_loi_list_lock);
2545 if (!list_empty(&ocw.ocw_entry)) {
2546 list_del(&ocw.ocw_entry);
2555 static int osc_reget_short_lock(struct obd_export *exp,
2556 struct lov_stripe_md *lsm,
2558 obd_off start, obd_off end,
2561 struct osc_async_page *oap = *res;
2566 spin_lock(&oap->oap_lock);
2567 rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2568 start, end, cookie);
2569 spin_unlock(&oap->oap_lock);
2574 static int osc_release_short_lock(struct obd_export *exp,
2575 struct lov_stripe_md *lsm, obd_off end,
2576 void *cookie, int rw)
2579 ldlm_lock_fast_release(cookie, rw);
2580 /* no error could have happened at this layer */
2584 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2585 struct lov_oinfo *loi, cfs_page_t *page,
2586 obd_off offset, struct obd_async_page_ops *ops,
2587 void *data, void **res, int nocache,
2588 struct lustre_handle *lockh)
2590 struct osc_async_page *oap;
2591 struct ldlm_res_id oid = {{0}};
2596 return size_round(sizeof(*oap));
2599 oap->oap_magic = OAP_MAGIC;
2600 oap->oap_cli = &exp->exp_obd->u.cli;
2603 oap->oap_caller_ops = ops;
2604 oap->oap_caller_data = data;
2606 oap->oap_page = page;
2607 oap->oap_obj_off = offset;
2609 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2610 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2611 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2612 CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2614 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2616 spin_lock_init(&oap->oap_lock);
2618 /* If the page was marked as notcacheable - don't add to any locks */
2620 oid.name[0] = loi->loi_id;
2621 oid.name[2] = loi->loi_gr;
2622 /* This is the only place where we can call cache_add_extent
2623 without oap_lock, because this page is locked now, and
2624 the lock we are adding it to is referenced, so cannot lose
2625 any pages either. */
2626 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2631 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2635 struct osc_async_page *oap_from_cookie(void *cookie)
2637 struct osc_async_page *oap = cookie;
2638 if (oap->oap_magic != OAP_MAGIC)
2639 return ERR_PTR(-EINVAL);
2643 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2644 struct lov_oinfo *loi, void *cookie,
2645 int cmd, obd_off off, int count,
2646 obd_flag brw_flags, enum async_flags async_flags)
2648 struct client_obd *cli = &exp->exp_obd->u.cli;
2649 struct osc_async_page *oap;
2653 oap = oap_from_cookie(cookie);
2655 RETURN(PTR_ERR(oap));
2657 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2660 if (!list_empty(&oap->oap_pending_item) ||
2661 !list_empty(&oap->oap_urgent_item) ||
2662 !list_empty(&oap->oap_rpc_item))
2665 /* check if the file's owner/group is over quota */
2666 #ifdef HAVE_QUOTA_SUPPORT
2667 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2668 struct obd_async_page_ops *ops;
2675 ops = oap->oap_caller_ops;
2676 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2677 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2688 loi = lsm->lsm_oinfo[0];
2690 client_obd_list_lock(&cli->cl_loi_list_lock);
2693 oap->oap_page_off = off;
2694 oap->oap_count = count;
2695 oap->oap_brw_flags = brw_flags;
2696 oap->oap_async_flags = async_flags;
2698 if (cmd & OBD_BRW_WRITE) {
2699 rc = osc_enter_cache(cli, loi, oap);
2701 client_obd_list_unlock(&cli->cl_loi_list_lock);
2706 osc_oap_to_pending(oap);
2707 loi_list_maint(cli, loi);
2709 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2712 osc_check_rpcs(cli);
2713 client_obd_list_unlock(&cli->cl_loi_list_lock);
2718 /* aka (~was & now & flag), but this is more clear :) */
2719 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2721 static int osc_set_async_flags(struct obd_export *exp,
2722 struct lov_stripe_md *lsm,
2723 struct lov_oinfo *loi, void *cookie,
2724 obd_flag async_flags)
2726 struct client_obd *cli = &exp->exp_obd->u.cli;
2727 struct loi_oap_pages *lop;
2728 struct osc_async_page *oap;
2732 oap = oap_from_cookie(cookie);
2734 RETURN(PTR_ERR(oap));
2737 * bug 7311: OST-side locking is only supported for liblustre for now
2738 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2739 * implementation has to handle case where OST-locked page was picked
2740 * up by, e.g., ->writepage().
2742 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2743 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2746 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2750 loi = lsm->lsm_oinfo[0];
2752 if (oap->oap_cmd & OBD_BRW_WRITE) {
2753 lop = &loi->loi_write_lop;
2755 lop = &loi->loi_read_lop;
2758 client_obd_list_lock(&cli->cl_loi_list_lock);
2760 if (list_empty(&oap->oap_pending_item))
2761 GOTO(out, rc = -EINVAL);
2763 if ((oap->oap_async_flags & async_flags) == async_flags)
2766 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2767 oap->oap_async_flags |= ASYNC_READY;
2769 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2770 if (list_empty(&oap->oap_rpc_item)) {
2771 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2772 loi_list_maint(cli, loi);
2776 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2777 oap->oap_async_flags);
2779 osc_check_rpcs(cli);
2780 client_obd_list_unlock(&cli->cl_loi_list_lock);
2784 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2785 struct lov_oinfo *loi,
2786 struct obd_io_group *oig, void *cookie,
2787 int cmd, obd_off off, int count,
2789 obd_flag async_flags)
2791 struct client_obd *cli = &exp->exp_obd->u.cli;
2792 struct osc_async_page *oap;
2793 struct loi_oap_pages *lop;
2797 oap = oap_from_cookie(cookie);
2799 RETURN(PTR_ERR(oap));
2801 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2804 if (!list_empty(&oap->oap_pending_item) ||
2805 !list_empty(&oap->oap_urgent_item) ||
2806 !list_empty(&oap->oap_rpc_item))
2810 loi = lsm->lsm_oinfo[0];
2812 client_obd_list_lock(&cli->cl_loi_list_lock);
2815 oap->oap_page_off = off;
2816 oap->oap_count = count;
2817 oap->oap_brw_flags = brw_flags;
2818 oap->oap_async_flags = async_flags;
2820 if (cmd & OBD_BRW_WRITE)
2821 lop = &loi->loi_write_lop;
2823 lop = &loi->loi_read_lop;
2825 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2826 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2828 rc = oig_add_one(oig, &oap->oap_occ);
2831 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2832 oap, oap->oap_page, rc);
2834 client_obd_list_unlock(&cli->cl_loi_list_lock);
2839 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2840 struct loi_oap_pages *lop, int cmd)
2842 struct list_head *pos, *tmp;
2843 struct osc_async_page *oap;
2845 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2846 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2847 list_del(&oap->oap_pending_item);
2848 osc_oap_to_pending(oap);
2850 loi_list_maint(cli, loi);
2853 static int osc_trigger_group_io(struct obd_export *exp,
2854 struct lov_stripe_md *lsm,
2855 struct lov_oinfo *loi,
2856 struct obd_io_group *oig)
2858 struct client_obd *cli = &exp->exp_obd->u.cli;
2862 loi = lsm->lsm_oinfo[0];
2864 client_obd_list_lock(&cli->cl_loi_list_lock);
2866 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2867 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2869 osc_check_rpcs(cli);
2870 client_obd_list_unlock(&cli->cl_loi_list_lock);
2875 static int osc_teardown_async_page(struct obd_export *exp,
2876 struct lov_stripe_md *lsm,
2877 struct lov_oinfo *loi, void *cookie)
2879 struct client_obd *cli = &exp->exp_obd->u.cli;
2880 struct loi_oap_pages *lop;
2881 struct osc_async_page *oap;
2885 oap = oap_from_cookie(cookie);
2887 RETURN(PTR_ERR(oap));
2890 loi = lsm->lsm_oinfo[0];
2892 if (oap->oap_cmd & OBD_BRW_WRITE) {
2893 lop = &loi->loi_write_lop;
2895 lop = &loi->loi_read_lop;
2898 client_obd_list_lock(&cli->cl_loi_list_lock);
2900 if (!list_empty(&oap->oap_rpc_item))
2901 GOTO(out, rc = -EBUSY);
2903 osc_exit_cache(cli, oap, 0);
2904 osc_wake_cache_waiters(cli);
2906 if (!list_empty(&oap->oap_urgent_item)) {
2907 list_del_init(&oap->oap_urgent_item);
2908 oap->oap_async_flags &= ~ASYNC_URGENT;
2910 if (!list_empty(&oap->oap_pending_item)) {
2911 list_del_init(&oap->oap_pending_item);
2912 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2914 loi_list_maint(cli, loi);
2915 cache_remove_extent(cli->cl_cache, oap);
2917 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2919 client_obd_list_unlock(&cli->cl_loi_list_lock);
2923 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2924 struct ldlm_lock_desc *new, void *data,
2927 struct lustre_handle lockh = { 0 };
2931 if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2932 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2937 case LDLM_CB_BLOCKING:
2938 ldlm_lock2handle(lock, &lockh);
2939 rc = ldlm_cli_cancel(&lockh);
2941 CERROR("ldlm_cli_cancel failed: %d\n", rc);
2943 case LDLM_CB_CANCELING: {
2945 ldlm_lock2handle(lock, &lockh);
2946 /* This lock wasn't granted, don't try to do anything */
2947 if (lock->l_req_mode != lock->l_granted_mode)
2950 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
2953 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
2954 lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
2955 lock, new, data,flag);
2964 EXPORT_SYMBOL(osc_extent_blocking_cb);
2966 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2969 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2972 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2975 lock_res_and_lock(lock);
2976 #if defined (__KERNEL__) && defined (__linux__)
2977 /* Liang XXX: Darwin and Winnt checking should be added */
2978 if (lock->l_ast_data && lock->l_ast_data != data) {
2979 struct inode *new_inode = data;
2980 struct inode *old_inode = lock->l_ast_data;
2981 if (!(old_inode->i_state & I_FREEING))
2982 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2983 LASSERTF(old_inode->i_state & I_FREEING,
2984 "Found existing inode %p/%lu/%u state %lu in lock: "
2985 "setting data to %p/%lu/%u\n", old_inode,
2986 old_inode->i_ino, old_inode->i_generation,
2988 new_inode, new_inode->i_ino, new_inode->i_generation);
2991 lock->l_ast_data = data;
2992 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2993 unlock_res_and_lock(lock);
2994 LDLM_LOCK_PUT(lock);
2997 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2998 ldlm_iterator_t replace, void *data)
3000 struct ldlm_res_id res_id = { .name = {0} };
3001 struct obd_device *obd = class_exp2obd(exp);
3003 res_id.name[0] = lsm->lsm_object_id;
3004 res_id.name[2] = lsm->lsm_object_gr;
3006 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3010 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3011 struct obd_info *oinfo, int intent, int rc)
3016 /* The request was created before ldlm_cli_enqueue call. */
3017 if (rc == ELDLM_LOCK_ABORTED) {
3018 struct ldlm_reply *rep;
3019 rep = req_capsule_server_get(&req->rq_pill,
3022 LASSERT(rep != NULL);
3023 if (rep->lock_policy_res1)
3024 rc = rep->lock_policy_res1;
3028 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3029 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3030 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3031 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3032 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3036 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3038 /* Call the update callback. */
3039 rc = oinfo->oi_cb_up(oinfo, rc);
3043 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3044 struct osc_enqueue_args *aa, int rc)
3046 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3047 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3048 struct ldlm_lock *lock;
3050 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3052 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3054 /* Complete obtaining the lock procedure. */
3055 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3057 &aa->oa_oi->oi_flags,
3058 &lsm->lsm_oinfo[0]->loi_lvb,
3059 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3060 lustre_swab_ost_lvb,
3061 aa->oa_oi->oi_lockh, rc);
3063 /* Complete osc stuff. */
3064 rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3066 /* Release the lock for async request. */
3067 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3068 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3070 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3071 aa->oa_oi->oi_lockh, req, aa);
3072 LDLM_LOCK_PUT(lock);
3076 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3077 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3078 * other synchronous requests, however keeping some locks and trying to obtain
3079 * others may take a considerable amount of time in a case of ost failure; and
3080 * when other sync requests do not get released lock from a client, the client
3081 * is excluded from the cluster -- such scenarious make the life difficult, so
3082 * release locks just after they are obtained. */
3083 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3084 struct ldlm_enqueue_info *einfo,
3085 struct ptlrpc_request_set *rqset)
3087 struct ldlm_res_id res_id = { .name = {0} };
3088 struct obd_device *obd = exp->exp_obd;
3089 struct ptlrpc_request *req = NULL;
3090 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3095 res_id.name[0] = oinfo->oi_md->lsm_object_id;
3096 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
3098 /* Filesystem lock extents are extended to page boundaries so that
3099 * dealing with the page cache is a little smoother. */
3100 oinfo->oi_policy.l_extent.start -=
3101 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3102 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3104 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3107 /* Next, search for already existing extent locks that will cover us */
3108 /* If we're trying to read, we also search for an existing PW lock. The
3109 * VFS and page cache already protect us locally, so lots of readers/
3110 * writers can share a single PW lock.
3112 * There are problems with conversion deadlocks, so instead of
3113 * converting a read lock to a write lock, we'll just enqueue a new
3116 * At some point we should cancel the read lock instead of making them
3117 * send us a blocking callback, but there are problems with canceling
3118 * locks out from other users right now, too. */
3119 mode = einfo->ei_mode;
3120 if (einfo->ei_mode == LCK_PR)
3122 mode = ldlm_lock_match(obd->obd_namespace,
3123 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3124 einfo->ei_type, &oinfo->oi_policy, mode,
3127 /* addref the lock only if not async requests and PW lock is
3128 * matched whereas we asked for PR. */
3129 if (!rqset && einfo->ei_mode != mode)
3130 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3131 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3134 /* I would like to be able to ASSERT here that rss <=
3135 * kms, but I can't, for reasons which are explained in
3139 /* We already have a lock, and it's referenced */
3140 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3142 /* For async requests, decref the lock. */
3143 if (einfo->ei_mode != mode)
3144 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3146 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3153 CFS_LIST_HEAD(cancels);
3154 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3155 &RQF_LDLM_ENQUEUE_LVB);
3159 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3163 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3164 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3165 ptlrpc_request_set_replen(req);
3168 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3169 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3171 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3172 &oinfo->oi_policy, &oinfo->oi_flags,
3173 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3174 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3175 lustre_swab_ost_lvb, oinfo->oi_lockh,
3179 struct osc_enqueue_args *aa;
3180 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3181 aa = (struct osc_enqueue_args *)&req->rq_async_args;
3186 req->rq_interpret_reply = osc_enqueue_interpret;
3187 ptlrpc_set_add_req(rqset, req);
3188 } else if (intent) {
3189 ptlrpc_req_finished(req);
3194 rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3196 ptlrpc_req_finished(req);
3201 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3202 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3203 int *flags, void *data, struct lustre_handle *lockh)
3205 struct ldlm_res_id res_id = { .name = {0} };
3206 struct obd_device *obd = exp->exp_obd;
3207 int lflags = *flags;
3211 res_id.name[0] = lsm->lsm_object_id;
3212 res_id.name[2] = lsm->lsm_object_gr;
3214 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3217 /* Filesystem lock extents are extended to page boundaries so that
3218 * dealing with the page cache is a little smoother */
3219 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3220 policy->l_extent.end |= ~CFS_PAGE_MASK;
3222 /* Next, search for already existing extent locks that will cover us */
3223 /* If we're trying to read, we also search for an existing PW lock. The
3224 * VFS and page cache already protect us locally, so lots of readers/
3225 * writers can share a single PW lock. */
3229 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3230 &res_id, type, policy, rc, lockh);
3232 osc_set_data_with_check(lockh, data, lflags);
3233 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3234 ldlm_lock_addref(lockh, LCK_PR);
3235 ldlm_lock_decref(lockh, LCK_PW);
3242 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3243 __u32 mode, struct lustre_handle *lockh)
3247 if (unlikely(mode == LCK_GROUP))
3248 ldlm_lock_decref_and_cancel(lockh, mode);
3250 ldlm_lock_decref(lockh, mode);
3255 static int osc_cancel_unused(struct obd_export *exp,
3256 struct lov_stripe_md *lsm, int flags,
3259 struct obd_device *obd = class_exp2obd(exp);
3260 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3263 res_id.name[0] = lsm->lsm_object_id;
3264 res_id.name[2] = lsm->lsm_object_gr;
3268 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3271 static int osc_join_lru(struct obd_export *exp,
3272 struct lov_stripe_md *lsm, int join)
3274 struct obd_device *obd = class_exp2obd(exp);
3275 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3278 res_id.name[0] = lsm->lsm_object_id;
3279 res_id.name[2] = lsm->lsm_object_gr;
3283 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3286 static int osc_statfs_interpret(struct ptlrpc_request *req,
3287 struct osc_async_args *aa, int rc)
3289 struct obd_statfs *msfs;
3295 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3297 GOTO(out, rc = -EPROTO);
3300 *aa->aa_oi->oi_osfs = *msfs;
3302 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3306 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3307 __u64 max_age, struct ptlrpc_request_set *rqset)
3309 struct ptlrpc_request *req;
3310 struct osc_async_args *aa;
3314 /* We could possibly pass max_age in the request (as an absolute
3315 * timestamp or a "seconds.usec ago") so the target can avoid doing
3316 * extra calls into the filesystem if that isn't necessary (e.g.
3317 * during mount that would help a bit). Having relative timestamps
3318 * is not so great if request processing is slow, while absolute
3319 * timestamps are not ideal because they need time synchronization. */
3320 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3324 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3326 ptlrpc_request_free(req);
3329 ptlrpc_request_set_replen(req);
3330 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3331 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3332 /* procfs requests not want stat in wait for avoid deadlock */
3333 req->rq_no_resend = 1;
3334 req->rq_no_delay = 1;
3337 req->rq_interpret_reply = osc_statfs_interpret;
3338 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3339 aa = (struct osc_async_args *)&req->rq_async_args;
3342 ptlrpc_set_add_req(rqset, req);
3346 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3347 __u64 max_age, __u32 flags)
3349 struct obd_statfs *msfs;
3350 struct ptlrpc_request *req;
3354 /* We could possibly pass max_age in the request (as an absolute
3355 * timestamp or a "seconds.usec ago") so the target can avoid doing
3356 * extra calls into the filesystem if that isn't necessary (e.g.
3357 * during mount that would help a bit). Having relative timestamps
3358 * is not so great if request processing is slow, while absolute
3359 * timestamps are not ideal because they need time synchronization. */
3360 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3364 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3366 ptlrpc_request_free(req);
3369 ptlrpc_request_set_replen(req);
3370 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3372 if (flags & OBD_STATFS_NODELAY) {
3373 /* procfs requests not want stat in wait for avoid deadlock */
3374 req->rq_no_resend = 1;
3375 req->rq_no_delay = 1;
3378 rc = ptlrpc_queue_wait(req);
3382 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3384 GOTO(out, rc = -EPROTO);
3391 ptlrpc_req_finished(req);
3395 /* Retrieve object striping information.
3397 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3398 * the maximum number of OST indices which will fit in the user buffer.
3399 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3401 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3403 struct lov_user_md lum, *lumk;
3404 int rc = 0, lum_size;
3410 if (copy_from_user(&lum, lump, sizeof(lum)))
3413 if (lum.lmm_magic != LOV_USER_MAGIC)
3416 if (lum.lmm_stripe_count > 0) {
3417 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3418 OBD_ALLOC(lumk, lum_size);
3422 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3423 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3425 lum_size = sizeof(lum);
3429 lumk->lmm_object_id = lsm->lsm_object_id;
3430 lumk->lmm_object_gr = lsm->lsm_object_gr;
3431 lumk->lmm_stripe_count = 1;
3433 if (copy_to_user(lump, lumk, lum_size))
3437 OBD_FREE(lumk, lum_size);
3443 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3444 void *karg, void *uarg)
3446 struct obd_device *obd = exp->exp_obd;
3447 struct obd_ioctl_data *data = karg;
3451 if (!try_module_get(THIS_MODULE)) {
3452 CERROR("Can't get module. Is it alive?");
3456 case OBD_IOC_LOV_GET_CONFIG: {
3458 struct lov_desc *desc;
3459 struct obd_uuid uuid;
3463 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3464 GOTO(out, err = -EINVAL);
3466 data = (struct obd_ioctl_data *)buf;
3468 if (sizeof(*desc) > data->ioc_inllen1) {
3469 obd_ioctl_freedata(buf, len);
3470 GOTO(out, err = -EINVAL);
3473 if (data->ioc_inllen2 < sizeof(uuid)) {
3474 obd_ioctl_freedata(buf, len);
3475 GOTO(out, err = -EINVAL);
3478 desc = (struct lov_desc *)data->ioc_inlbuf1;
3479 desc->ld_tgt_count = 1;
3480 desc->ld_active_tgt_count = 1;
3481 desc->ld_default_stripe_count = 1;
3482 desc->ld_default_stripe_size = 0;
3483 desc->ld_default_stripe_offset = 0;
3484 desc->ld_pattern = 0;
3485 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3487 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3489 err = copy_to_user((void *)uarg, buf, len);
3492 obd_ioctl_freedata(buf, len);
3495 case LL_IOC_LOV_SETSTRIPE:
3496 err = obd_alloc_memmd(exp, karg);
3500 case LL_IOC_LOV_GETSTRIPE:
3501 err = osc_getstripe(karg, uarg);
3503 case OBD_IOC_CLIENT_RECOVER:
3504 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3509 case IOC_OSC_SET_ACTIVE:
3510 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3513 case OBD_IOC_POLL_QUOTACHECK:
3514 err = lquota_poll_check(quota_interface, exp,
3515 (struct if_quotacheck *)karg);
3518 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3519 cmd, cfs_curproc_comm());
3520 GOTO(out, err = -ENOTTY);
3523 module_put(THIS_MODULE);
3527 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3528 void *key, __u32 *vallen, void *val)
3531 if (!vallen || !val)
3534 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3535 __u32 *stripe = val;
3536 *vallen = sizeof(*stripe);
3539 } else if (KEY_IS(KEY_LAST_ID)) {
3540 struct ptlrpc_request *req;
3545 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3546 &RQF_OST_GET_INFO_LAST_ID);
3550 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3551 RCL_CLIENT, keylen);
3552 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3554 ptlrpc_request_free(req);
3558 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3559 memcpy(tmp, key, keylen);
3561 ptlrpc_request_set_replen(req);
3562 rc = ptlrpc_queue_wait(req);
3566 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3568 GOTO(out, rc = -EPROTO);
3570 *((obd_id *)val) = *reply;
3572 ptlrpc_req_finished(req);
3578 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3581 struct llog_ctxt *ctxt;
3582 struct obd_import *imp = req->rq_import;
3588 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3591 rc = llog_initiator_connect(ctxt);
3593 CERROR("cannot establish connection for "
3594 "ctxt %p: %d\n", ctxt, rc);
3597 llog_ctxt_put(ctxt);
3598 spin_lock(&imp->imp_lock);
3599 imp->imp_server_timeout = 1;
3600 imp->imp_pingable = 1;
3601 spin_unlock(&imp->imp_lock);
3602 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3607 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3608 void *key, obd_count vallen, void *val,
3609 struct ptlrpc_request_set *set)
3611 struct ptlrpc_request *req;
3612 struct obd_device *obd = exp->exp_obd;
3613 struct obd_import *imp = class_exp2cliimp(exp);
3618 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3620 if (KEY_IS(KEY_NEXT_ID)) {
3621 if (vallen != sizeof(obd_id))
3625 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3626 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3627 exp->exp_obd->obd_name,
3628 obd->u.cli.cl_oscc.oscc_next_id);
3633 if (KEY_IS(KEY_UNLINKED)) {
3634 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3635 spin_lock(&oscc->oscc_lock);
3636 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3637 spin_unlock(&oscc->oscc_lock);
3641 if (KEY_IS(KEY_INIT_RECOV)) {
3642 if (vallen != sizeof(int))
3644 spin_lock(&imp->imp_lock);
3645 imp->imp_initial_recov = *(int *)val;
3646 spin_unlock(&imp->imp_lock);
3647 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3648 exp->exp_obd->obd_name,
3649 imp->imp_initial_recov);
3653 if (KEY_IS(KEY_CHECKSUM)) {
3654 if (vallen != sizeof(int))
3656 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3660 if (KEY_IS(KEY_FLUSH_CTX)) {
3661 sptlrpc_import_flush_my_ctx(imp);
3668 /* We pass all other commands directly to OST. Since nobody calls osc
3669 methods directly and everybody is supposed to go through LOV, we
3670 assume lov checked invalid values for us.
3671 The only recognised values so far are evict_by_nid and mds_conn.
3672 Even if something bad goes through, we'd get a -EINVAL from OST
3676 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3680 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3681 RCL_CLIENT, keylen);
3682 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3683 RCL_CLIENT, vallen);
3684 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3686 ptlrpc_request_free(req);
3690 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3691 memcpy(tmp, key, keylen);
3692 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3693 memcpy(tmp, val, vallen);
3695 if (KEY_IS(KEY_MDS_CONN)) {
3696 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3698 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3699 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3700 LASSERT(oscc->oscc_oa.o_gr > 0);
3701 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3704 ptlrpc_request_set_replen(req);
3705 ptlrpc_set_add_req(set, req);
3706 ptlrpc_check_set(set);
3712 static struct llog_operations osc_size_repl_logops = {
3713 lop_cancel: llog_obd_repl_cancel
3716 static struct llog_operations osc_mds_ost_orig_logops;
3717 static int osc_llog_init(struct obd_device *obd, int group,
3718 struct obd_device *tgt, int count,
3719 struct llog_catid *catid, struct obd_uuid *uuid)
3723 LASSERT(group == OBD_LLOG_GROUP);
3724 spin_lock(&obd->obd_dev_lock);
3725 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3726 osc_mds_ost_orig_logops = llog_lvfs_ops;
3727 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3728 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3729 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3730 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3732 spin_unlock(&obd->obd_dev_lock);
3734 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3735 &catid->lci_logid, &osc_mds_ost_orig_logops);
3737 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3741 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3742 NULL, &osc_size_repl_logops);
3744 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3747 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3748 obd->obd_name, tgt->obd_name, count, catid, rc);
3749 CERROR("logid "LPX64":0x%x\n",
3750 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3755 static int osc_llog_finish(struct obd_device *obd, int count)
3757 struct llog_ctxt *ctxt;
3758 int rc = 0, rc2 = 0;
3761 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3763 rc = llog_cleanup(ctxt);
3765 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3767 rc2 = llog_cleanup(ctxt);
3774 static int osc_reconnect(const struct lu_env *env,
3775 struct obd_export *exp, struct obd_device *obd,
3776 struct obd_uuid *cluuid,
3777 struct obd_connect_data *data)
3779 struct client_obd *cli = &obd->u.cli;
3781 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3784 client_obd_list_lock(&cli->cl_loi_list_lock);
3785 data->ocd_grant = cli->cl_avail_grant ?:
3786 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3787 lost_grant = cli->cl_lost_grant;
3788 cli->cl_lost_grant = 0;
3789 client_obd_list_unlock(&cli->cl_loi_list_lock);
3791 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3792 "cl_lost_grant: %ld\n", data->ocd_grant,
3793 cli->cl_avail_grant, lost_grant);
3794 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3795 " ocd_grant: %d\n", data->ocd_connect_flags,
3796 data->ocd_version, data->ocd_grant);
3802 static int osc_disconnect(struct obd_export *exp)
3804 struct obd_device *obd = class_exp2obd(exp);
3805 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3808 if (obd->u.cli.cl_conn_count == 1)
3809 /* flush any remaining cancel messages out to the target */
3810 llog_sync(ctxt, exp);
3812 llog_ctxt_put(ctxt);
3814 rc = client_disconnect_export(exp);
3818 static int osc_import_event(struct obd_device *obd,
3819 struct obd_import *imp,
3820 enum obd_import_event event)
3822 struct client_obd *cli;
3826 LASSERT(imp->imp_obd == obd);
3829 case IMP_EVENT_DISCON: {
3830 /* Only do this on the MDS OSC's */
3831 if (imp->imp_server_timeout) {
3832 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3834 spin_lock(&oscc->oscc_lock);
3835 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3836 spin_unlock(&oscc->oscc_lock);
3839 client_obd_list_lock(&cli->cl_loi_list_lock);
3840 cli->cl_avail_grant = 0;
3841 cli->cl_lost_grant = 0;
3842 client_obd_list_unlock(&cli->cl_loi_list_lock);
3845 case IMP_EVENT_INACTIVE: {
3846 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3849 case IMP_EVENT_INVALIDATE: {
3850 struct ldlm_namespace *ns = obd->obd_namespace;
3854 client_obd_list_lock(&cli->cl_loi_list_lock);
3855 /* all pages go to failing rpcs due to the invalid import */
3856 osc_check_rpcs(cli);
3857 client_obd_list_unlock(&cli->cl_loi_list_lock);
3859 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3863 case IMP_EVENT_ACTIVE: {
3864 /* Only do this on the MDS OSC's */
3865 if (imp->imp_server_timeout) {
3866 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3868 spin_lock(&oscc->oscc_lock);
3869 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3870 spin_unlock(&oscc->oscc_lock);
3872 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3875 case IMP_EVENT_OCD: {
3876 struct obd_connect_data *ocd = &imp->imp_connect_data;
3878 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3879 osc_init_grant(&obd->u.cli, ocd);
3882 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3883 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3885 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3889 CERROR("Unknown import event %d\n", event);
3895 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3901 rc = ptlrpcd_addref();
3905 rc = client_obd_setup(obd, lcfg);
3909 struct lprocfs_static_vars lvars = { 0 };
3910 struct client_obd *cli = &obd->u.cli;
3912 lprocfs_osc_init_vars(&lvars);
3913 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3914 lproc_osc_attach_seqstat(obd);
3915 sptlrpc_lprocfs_cliobd_attach(obd);
3916 ptlrpc_lprocfs_register_obd(obd);
3920 /* We need to allocate a few requests more, because
3921 brw_interpret tries to create new requests before freeing
3922 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3923 reserved, but I afraid that might be too much wasted RAM
3924 in fact, so 2 is just my guess and still should work. */
3925 cli->cl_import->imp_rq_pool =
3926 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3928 ptlrpc_add_rqs_to_pool);
3929 cli->cl_cache = cache_create(obd);
3930 if (!cli->cl_cache) {
3939 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3945 case OBD_CLEANUP_EARLY: {
3946 struct obd_import *imp;
3947 imp = obd->u.cli.cl_import;
3948 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3949 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3950 ptlrpc_deactivate_import(imp);
3951 spin_lock(&imp->imp_lock);
3952 imp->imp_pingable = 0;
3953 spin_unlock(&imp->imp_lock);
3956 case OBD_CLEANUP_EXPORTS: {
3957 /* If we set up but never connected, the
3958 client import will not have been cleaned. */
3959 if (obd->u.cli.cl_import) {
3960 struct obd_import *imp;
3961 imp = obd->u.cli.cl_import;
3962 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3964 ptlrpc_invalidate_import(imp);
3965 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3966 class_destroy_import(imp);
3967 obd->u.cli.cl_import = NULL;
3971 case OBD_CLEANUP_SELF_EXP:
3972 rc = obd_llog_finish(obd, 0);
3974 CERROR("failed to cleanup llogging subsystems\n");
3976 case OBD_CLEANUP_OBD:
3982 int osc_cleanup(struct obd_device *obd)
3984 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3988 ptlrpc_lprocfs_unregister_obd(obd);
3989 lprocfs_obd_cleanup(obd);
3991 spin_lock(&oscc->oscc_lock);
3992 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3993 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3994 spin_unlock(&oscc->oscc_lock);
3996 /* free memory of osc quota cache */
3997 lquota_cleanup(quota_interface, obd);
3999 cache_destroy(obd->u.cli.cl_cache);
4000 rc = client_obd_cleanup(obd);
4006 static int osc_register_page_removal_cb(struct obd_export *exp,
4007 obd_page_removal_cb_t func,
4008 obd_pin_extent_cb pin_cb)
4010 return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4014 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4015 obd_page_removal_cb_t func)
4017 return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4020 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4021 obd_lock_cancel_cb cb)
4023 LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4025 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4029 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4030 obd_lock_cancel_cb cb)
4032 if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4033 CERROR("Unregistering cancel cb %p, while only %p was "
4035 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4039 exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4043 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4045 struct lustre_cfg *lcfg = buf;
4046 struct lprocfs_static_vars lvars = { 0 };
4049 lprocfs_osc_init_vars(&lvars);
4051 switch (lcfg->lcfg_command) {
4052 case LCFG_SPTLRPC_CONF:
4053 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4056 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4064 struct obd_ops osc_obd_ops = {
4065 .o_owner = THIS_MODULE,
4066 .o_setup = osc_setup,
4067 .o_precleanup = osc_precleanup,
4068 .o_cleanup = osc_cleanup,
4069 .o_add_conn = client_import_add_conn,
4070 .o_del_conn = client_import_del_conn,
4071 .o_connect = client_connect_import,
4072 .o_reconnect = osc_reconnect,
4073 .o_disconnect = osc_disconnect,
4074 .o_statfs = osc_statfs,
4075 .o_statfs_async = osc_statfs_async,
4076 .o_packmd = osc_packmd,
4077 .o_unpackmd = osc_unpackmd,
4078 .o_precreate = osc_precreate,
4079 .o_create = osc_create,
4080 .o_destroy = osc_destroy,
4081 .o_getattr = osc_getattr,
4082 .o_getattr_async = osc_getattr_async,
4083 .o_setattr = osc_setattr,
4084 .o_setattr_async = osc_setattr_async,
4086 .o_brw_async = osc_brw_async,
4087 .o_prep_async_page = osc_prep_async_page,
4088 .o_reget_short_lock = osc_reget_short_lock,
4089 .o_release_short_lock = osc_release_short_lock,
4090 .o_queue_async_io = osc_queue_async_io,
4091 .o_set_async_flags = osc_set_async_flags,
4092 .o_queue_group_io = osc_queue_group_io,
4093 .o_trigger_group_io = osc_trigger_group_io,
4094 .o_teardown_async_page = osc_teardown_async_page,
4095 .o_punch = osc_punch,
4097 .o_enqueue = osc_enqueue,
4098 .o_match = osc_match,
4099 .o_change_cbdata = osc_change_cbdata,
4100 .o_cancel = osc_cancel,
4101 .o_cancel_unused = osc_cancel_unused,
4102 .o_join_lru = osc_join_lru,
4103 .o_iocontrol = osc_iocontrol,
4104 .o_get_info = osc_get_info,
4105 .o_set_info_async = osc_set_info_async,
4106 .o_import_event = osc_import_event,
4107 .o_llog_init = osc_llog_init,
4108 .o_llog_finish = osc_llog_finish,
4109 .o_process_config = osc_process_config,
4110 .o_register_page_removal_cb = osc_register_page_removal_cb,
4111 .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4112 .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4113 .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4115 int __init osc_init(void)
4117 struct lprocfs_static_vars lvars = { 0 };
4121 lprocfs_osc_init_vars(&lvars);
4123 request_module("lquota");
4124 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4125 lquota_init(quota_interface);
4126 init_obd_quota_ops(quota_interface, &osc_obd_ops);
4128 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4129 LUSTRE_OSC_NAME, NULL);
4131 if (quota_interface)
4132 PORTAL_SYMBOL_PUT(osc_quota_interface);
4140 static void /*__exit*/ osc_exit(void)
4142 lquota_exit(quota_interface);
4143 if (quota_interface)
4144 PORTAL_SYMBOL_PUT(osc_quota_interface);
4146 class_unregister_type(LUSTRE_OSC_NAME);
4149 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4150 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4151 MODULE_LICENSE("GPL");
4153 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);