1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102 struct lov_mds_md *lmm, int lmm_bytes)
108 if (lmm_bytes < sizeof (*lmm)) {
109 CERROR("lov_mds_md too small: %d, need %d\n",
110 lmm_bytes, (int)sizeof(*lmm));
113 /* XXX LOV_MAGIC etc check? */
115 if (lmm->lmm_object_id == 0) {
116 CERROR("lov_mds_md: zero lmm_object_id\n");
121 lsm_size = lov_stripe_md_size(1);
125 if (*lsmp != NULL && lmm == NULL) {
126 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
127 OBD_FREE(*lsmp, lsm_size);
133 OBD_ALLOC(*lsmp, lsm_size);
136 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137 if ((*lsmp)->lsm_oinfo[0] == NULL) {
138 OBD_FREE(*lsmp, lsm_size);
141 loi_init((*lsmp)->lsm_oinfo[0]);
145 /* XXX zero *lsmp? */
146 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
147 LASSERT((*lsmp)->lsm_object_id);
150 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155 static int osc_getattr_interpret(struct ptlrpc_request *req,
156 struct osc_async_args *aa, int rc)
158 struct ost_body *body;
164 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
165 lustre_swab_ost_body);
167 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
168 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
170 /* This should really be sent by the OST */
171 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
172 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
174 CERROR("can't unpack ost_body\n");
176 aa->aa_oi->oi_oa->o_valid = 0;
179 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
183 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
184 struct ptlrpc_request_set *set)
186 struct ptlrpc_request *req;
187 struct ost_body *body;
188 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
189 struct osc_async_args *aa;
192 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
193 OST_GETATTR, 2, size,NULL);
197 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
198 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
200 ptlrpc_req_set_repsize(req, 2, size);
201 req->rq_interpret_reply = osc_getattr_interpret;
203 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
204 aa = (struct osc_async_args *)&req->rq_async_args;
207 ptlrpc_set_add_req(set, req);
211 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
213 struct ptlrpc_request *req;
214 struct ost_body *body;
215 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
218 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
219 OST_GETATTR, 2, size, NULL);
223 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
224 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
226 ptlrpc_req_set_repsize(req, 2, size);
228 rc = ptlrpc_queue_wait(req);
230 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
234 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
235 lustre_swab_ost_body);
237 CERROR ("can't unpack ost_body\n");
238 GOTO (out, rc = -EPROTO);
241 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
242 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
244 /* This should really be sent by the OST */
245 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
246 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
250 ptlrpc_req_finished(req);
254 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
255 struct obd_trans_info *oti)
257 struct ptlrpc_request *req;
258 struct ost_body *body;
259 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
262 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
263 OST_SETATTR, 2, size, NULL);
267 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
268 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
270 ptlrpc_req_set_repsize(req, 2, size);
272 rc = ptlrpc_queue_wait(req);
276 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
277 lustre_swab_ost_body);
279 GOTO(out, rc = -EPROTO);
281 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
285 ptlrpc_req_finished(req);
289 static int osc_setattr_interpret(struct ptlrpc_request *req,
290 struct osc_async_args *aa, int rc)
292 struct ost_body *body;
298 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
299 lustre_swab_ost_body);
301 CERROR("can't unpack ost_body\n");
302 GOTO(out, rc = -EPROTO);
305 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
307 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
311 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
312 struct obd_trans_info *oti,
313 struct ptlrpc_request_set *rqset)
315 struct ptlrpc_request *req;
316 struct ost_body *body;
317 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
318 struct osc_async_args *aa;
321 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
322 OST_SETATTR, 2, size, NULL);
326 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
328 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
330 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
331 sizeof(*oti->oti_logcookies));
334 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
335 ptlrpc_req_set_repsize(req, 2, size);
336 /* do mds to ost setattr asynchronouly */
338 /* Do not wait for response. */
339 ptlrpcd_add_req(req);
341 req->rq_interpret_reply = osc_setattr_interpret;
343 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
344 aa = (struct osc_async_args *)&req->rq_async_args;
347 ptlrpc_set_add_req(rqset, req);
353 int osc_real_create(struct obd_export *exp, struct obdo *oa,
354 struct lov_stripe_md **ea, struct obd_trans_info *oti)
356 struct ptlrpc_request *req;
357 struct ost_body *body;
358 struct lov_stripe_md *lsm;
359 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
367 rc = obd_alloc_memmd(exp, &lsm);
372 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
373 OST_CREATE, 2, size, NULL);
375 GOTO(out, rc = -ENOMEM);
377 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
378 memcpy(&body->oa, oa, sizeof(body->oa));
380 ptlrpc_req_set_repsize(req, 2, size);
381 if (oa->o_valid & OBD_MD_FLINLINE) {
382 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
383 oa->o_flags == OBD_FL_DELORPHAN);
385 "delorphan from OST integration");
386 /* Don't resend the delorphan req */
387 req->rq_no_resend = req->rq_no_delay = 1;
390 rc = ptlrpc_queue_wait(req);
394 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
395 lustre_swab_ost_body);
397 CERROR ("can't unpack ost_body\n");
398 GOTO (out_req, rc = -EPROTO);
401 memcpy(oa, &body->oa, sizeof(*oa));
403 /* This should really be sent by the OST */
404 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
405 oa->o_valid |= OBD_MD_FLBLKSZ;
407 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
408 * have valid lsm_oinfo data structs, so don't go touching that.
409 * This needs to be fixed in a big way.
411 lsm->lsm_object_id = oa->o_id;
415 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
417 if (oa->o_valid & OBD_MD_FLCOOKIE) {
418 if (!oti->oti_logcookies)
419 oti_alloc_cookies(oti, 1);
420 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
421 sizeof(oti->oti_onecookie));
425 CDEBUG(D_HA, "transno: "LPD64"\n",
426 lustre_msg_get_transno(req->rq_repmsg));
429 ptlrpc_req_finished(req);
432 obd_free_memmd(exp, &lsm);
436 static int osc_punch_interpret(struct ptlrpc_request *req,
437 struct osc_async_args *aa, int rc)
439 struct ost_body *body;
445 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
446 lustre_swab_ost_body);
448 CERROR ("can't unpack ost_body\n");
449 GOTO(out, rc = -EPROTO);
452 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
454 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
458 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
459 struct obd_trans_info *oti,
460 struct ptlrpc_request_set *rqset)
462 struct ptlrpc_request *req;
463 struct osc_async_args *aa;
464 struct ost_body *body;
465 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
473 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
474 OST_PUNCH, 2, size, NULL);
478 /* FIXME bug 249. Also see bug 7198 */
479 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
480 OBD_CONNECT_REQPORTAL)
481 req->rq_request_portal = OST_IO_PORTAL;
483 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
484 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
486 /* overload the size and blocks fields in the oa with start/end */
487 body->oa.o_size = oinfo->oi_policy.l_extent.start;
488 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
489 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
491 ptlrpc_req_set_repsize(req, 2, size);
493 req->rq_interpret_reply = osc_punch_interpret;
494 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
495 aa = (struct osc_async_args *)&req->rq_async_args;
497 ptlrpc_set_add_req(rqset, req);
502 static int osc_sync(struct obd_export *exp, struct obdo *oa,
503 struct lov_stripe_md *md, obd_size start, obd_size end)
505 struct ptlrpc_request *req;
506 struct ost_body *body;
507 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
515 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
516 OST_SYNC, 2, size, NULL);
520 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
521 memcpy(&body->oa, oa, sizeof(*oa));
523 /* overload the size and blocks fields in the oa with start/end */
524 body->oa.o_size = start;
525 body->oa.o_blocks = end;
526 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
528 ptlrpc_req_set_repsize(req, 2, size);
530 rc = ptlrpc_queue_wait(req);
534 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
535 lustre_swab_ost_body);
537 CERROR ("can't unpack ost_body\n");
538 GOTO (out, rc = -EPROTO);
541 memcpy(oa, &body->oa, sizeof(*oa));
545 ptlrpc_req_finished(req);
549 /* Destroy requests can be async always on the client, and we don't even really
550 * care about the return code since the client cannot do anything at all about
552 * When the MDS is unlinking a filename, it saves the file objects into a
553 * recovery llog, and these object records are cancelled when the OST reports
554 * they were destroyed and sync'd to disk (i.e. transaction committed).
555 * If the client dies, or the OST is down when the object should be destroyed,
556 * the records are not cancelled, and when the OST reconnects to the MDS next,
557 * it will retrieve the llog unlink logs and then sends the log cancellation
558 * cookies to the MDS after committing destroy transactions. */
559 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
560 struct lov_stripe_md *ea, struct obd_trans_info *oti,
561 struct obd_export *md_export)
563 struct ptlrpc_request *req;
564 struct ost_body *body;
565 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
573 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
574 OST_DESTROY, 2, size, NULL);
578 /* FIXME bug 249. Also see bug 7198 */
579 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
580 OBD_CONNECT_REQPORTAL)
581 req->rq_request_portal = OST_IO_PORTAL;
583 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
585 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
586 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
587 sizeof(*oti->oti_logcookies));
590 memcpy(&body->oa, oa, sizeof(*oa));
591 ptlrpc_req_set_repsize(req, 2, size);
593 ptlrpcd_add_req(req);
597 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
600 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
602 LASSERT(!(oa->o_valid & bits));
605 client_obd_list_lock(&cli->cl_loi_list_lock);
606 oa->o_dirty = cli->cl_dirty;
607 if (cli->cl_dirty > cli->cl_dirty_max) {
608 CERROR("dirty %lu > dirty_max %lu\n",
609 cli->cl_dirty, cli->cl_dirty_max);
611 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
612 CERROR("dirty %d > system dirty_max %d\n",
613 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
615 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
616 CERROR("dirty %lu - dirty_max %lu too big???\n",
617 cli->cl_dirty, cli->cl_dirty_max);
620 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
621 (cli->cl_max_rpcs_in_flight + 1);
622 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
624 oa->o_grant = cli->cl_avail_grant;
625 oa->o_dropped = cli->cl_lost_grant;
626 cli->cl_lost_grant = 0;
627 client_obd_list_unlock(&cli->cl_loi_list_lock);
628 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
629 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
632 /* caller must hold loi_list_lock */
633 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
635 atomic_inc(&obd_dirty_pages);
636 cli->cl_dirty += CFS_PAGE_SIZE;
637 cli->cl_avail_grant -= CFS_PAGE_SIZE;
638 pga->flag |= OBD_BRW_FROM_GRANT;
639 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
640 CFS_PAGE_SIZE, pga, pga->pg);
641 LASSERT(cli->cl_avail_grant >= 0);
644 /* the companion to osc_consume_write_grant, called when a brw has completed.
645 * must be called with the loi lock held. */
646 static void osc_release_write_grant(struct client_obd *cli,
647 struct brw_page *pga, int sent)
649 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
652 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
657 pga->flag &= ~OBD_BRW_FROM_GRANT;
658 atomic_dec(&obd_dirty_pages);
659 cli->cl_dirty -= CFS_PAGE_SIZE;
661 cli->cl_lost_grant += CFS_PAGE_SIZE;
662 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
663 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
664 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
665 /* For short writes we shouldn't count parts of pages that
666 * span a whole block on the OST side, or our accounting goes
667 * wrong. Should match the code in filter_grant_check. */
668 int offset = pga->off & ~CFS_PAGE_MASK;
669 int count = pga->count + (offset & (blocksize - 1));
670 int end = (offset + pga->count) & (blocksize - 1);
672 count += blocksize - end;
674 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
675 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
676 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
677 cli->cl_avail_grant, cli->cl_dirty);
683 static unsigned long rpcs_in_flight(struct client_obd *cli)
685 return cli->cl_r_in_flight + cli->cl_w_in_flight;
688 /* caller must hold loi_list_lock */
689 void osc_wake_cache_waiters(struct client_obd *cli)
691 struct list_head *l, *tmp;
692 struct osc_cache_waiter *ocw;
695 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
696 /* if we can't dirty more, we must wait until some is written */
697 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
698 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
699 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
700 "osc max %ld, sys max %d\n", cli->cl_dirty,
701 cli->cl_dirty_max, obd_max_dirty_pages);
705 /* if still dirty cache but no grant wait for pending RPCs that
706 * may yet return us some grant before doing sync writes */
707 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
708 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
709 cli->cl_w_in_flight);
713 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
714 list_del_init(&ocw->ocw_entry);
715 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
716 /* no more RPCs in flight to return grant, do sync IO */
717 ocw->ocw_rc = -EDQUOT;
718 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
720 osc_consume_write_grant(cli,
721 &ocw->ocw_oap->oap_brw_page);
724 cfs_waitq_signal(&ocw->ocw_waitq);
730 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
732 client_obd_list_lock(&cli->cl_loi_list_lock);
733 cli->cl_avail_grant = ocd->ocd_grant;
734 client_obd_list_unlock(&cli->cl_loi_list_lock);
736 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
737 cli->cl_avail_grant, cli->cl_lost_grant);
738 LASSERT(cli->cl_avail_grant >= 0);
741 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
743 client_obd_list_lock(&cli->cl_loi_list_lock);
744 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
745 cli->cl_avail_grant += body->oa.o_grant;
746 /* waiters are woken in brw_interpret_oap */
747 client_obd_list_unlock(&cli->cl_loi_list_lock);
750 /* We assume that the reason this OSC got a short read is because it read
751 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
752 * via the LOV, and it _knows_ it's reading inside the file, it's just that
753 * this stripe never got written at or beyond this stripe offset yet. */
754 static void handle_short_read(int nob_read, obd_count page_count,
755 struct brw_page **pga)
760 /* skip bytes read OK */
761 while (nob_read > 0) {
762 LASSERT (page_count > 0);
764 if (pga[i]->count > nob_read) {
765 /* EOF inside this page */
766 ptr = cfs_kmap(pga[i]->pg) +
767 (pga[i]->off & ~CFS_PAGE_MASK);
768 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
769 cfs_kunmap(pga[i]->pg);
775 nob_read -= pga[i]->count;
780 /* zero remaining pages */
781 while (page_count-- > 0) {
782 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
783 memset(ptr, 0, pga[i]->count);
784 cfs_kunmap(pga[i]->pg);
789 static int check_write_rcs(struct ptlrpc_request *req,
790 int requested_nob, int niocount,
791 obd_count page_count, struct brw_page **pga)
795 /* return error if any niobuf was in error */
796 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
797 sizeof(*remote_rcs) * niocount, NULL);
798 if (remote_rcs == NULL) {
799 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
802 if (lustre_msg_swabbed(req->rq_repmsg))
803 for (i = 0; i < niocount; i++)
804 __swab32s(&remote_rcs[i]);
806 for (i = 0; i < niocount; i++) {
807 if (remote_rcs[i] < 0)
808 return(remote_rcs[i]);
810 if (remote_rcs[i] != 0) {
811 CERROR("rc[%d] invalid (%d) req %p\n",
812 i, remote_rcs[i], req);
817 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
818 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
819 requested_nob, req->rq_bulk->bd_nob_transferred);
826 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
828 if (p1->flag != p2->flag) {
829 unsigned mask = ~OBD_BRW_FROM_GRANT;
831 /* warn if we try to combine flags that we don't know to be
833 if ((p1->flag & mask) != (p2->flag & mask))
834 CERROR("is it ok to have flags 0x%x and 0x%x in the "
835 "same brw?\n", p1->flag, p2->flag);
839 return (p1->off + p1->count == p2->off);
842 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
843 struct brw_page **pga)
848 LASSERT (pg_count > 0);
849 while (nob > 0 && pg_count > 0) {
850 char *ptr = cfs_kmap(pga[i]->pg);
851 int off = pga[i]->off & ~CFS_PAGE_MASK;
852 int count = pga[i]->count > nob ? nob : pga[i]->count;
854 /* corrupt the data before we compute the checksum, to
855 * simulate an OST->client data error */
856 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
857 memcpy(ptr + off, "bad1", min(4, nob));
858 cksum = crc32_le(cksum, ptr + off, count);
859 cfs_kunmap(pga[i]->pg);
860 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
863 nob -= pga[i]->count;
867 /* For sending we only compute the wrong checksum instead
868 * of corrupting the data so it is still correct on a redo */
869 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
875 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
876 struct lov_stripe_md *lsm, obd_count page_count,
877 struct brw_page **pga,
878 struct ptlrpc_request **reqp)
880 struct ptlrpc_request *req;
881 struct ptlrpc_bulk_desc *desc;
882 struct ost_body *body;
883 struct obd_ioobj *ioobj;
884 struct niobuf_remote *niobuf;
885 int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
886 int niocount, i, requested_nob, opc, rc;
887 struct ptlrpc_request_pool *pool;
888 struct osc_brw_async_args *aa;
891 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
892 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
894 for (niocount = i = 1; i < page_count; i++) {
895 if (!can_merge_pages(pga[i - 1], pga[i]))
899 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
900 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
902 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
903 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
908 /* FIXME bug 249. Also see bug 7198 */
909 if (cli->cl_import->imp_connect_data.ocd_connect_flags &
910 OBD_CONNECT_REQPORTAL)
911 req->rq_request_portal = OST_IO_PORTAL;
913 if (opc == OST_WRITE)
914 desc = ptlrpc_prep_bulk_imp (req, page_count,
915 BULK_GET_SOURCE, OST_BULK_PORTAL);
917 desc = ptlrpc_prep_bulk_imp (req, page_count,
918 BULK_PUT_SINK, OST_BULK_PORTAL);
920 GOTO(out, rc = -ENOMEM);
921 /* NB request now owns desc and will free it when it gets freed */
923 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
924 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
925 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
926 niocount * sizeof(*niobuf));
928 memcpy(&body->oa, oa, sizeof(*oa));
930 obdo_to_ioobj(oa, ioobj);
931 ioobj->ioo_bufcnt = niocount;
933 LASSERT (page_count > 0);
934 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
935 struct brw_page *pg = pga[i];
936 struct brw_page *pg_prev = pga[i - 1];
938 LASSERT(pg->count > 0);
939 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
940 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
943 LASSERTF(i == 0 || pg->off > pg_prev->off,
944 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
945 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
947 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
948 pg_prev->pg, page_private(pg_prev->pg),
949 pg_prev->pg->index, pg_prev->off);
951 LASSERTF(i == 0 || pg->off > pg_prev->off,
952 "i %d p_c %u\n", i, page_count);
954 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
955 (pg->flag & OBD_BRW_SRVLOCK));
957 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
959 requested_nob += pg->count;
961 if (i > 0 && can_merge_pages(pg_prev, pg)) {
963 niobuf->len += pg->count;
965 niobuf->offset = pg->off;
966 niobuf->len = pg->count;
967 niobuf->flags = pg->flag;
971 LASSERT((void *)(niobuf - niocount) ==
972 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
973 niocount * sizeof(*niobuf)));
974 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
976 /* size[REQ_REC_OFF] still sizeof (*body) */
977 if (opc == OST_WRITE) {
978 if (unlikely(cli->cl_checksum)) {
979 body->oa.o_valid |= OBD_MD_FLCKSUM;
980 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
982 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
984 /* save this in 'oa', too, for later checking */
985 oa->o_valid |= OBD_MD_FLCKSUM;
987 /* clear out the checksum flag, in case this is a
988 * resend but cl_checksum is no longer set. b=11238 */
989 oa->o_valid &= ~OBD_MD_FLCKSUM;
991 oa->o_cksum = body->oa.o_cksum;
992 /* 1 RC per niobuf */
993 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
994 ptlrpc_req_set_repsize(req, 3, size);
996 if (unlikely(cli->cl_checksum))
997 body->oa.o_valid |= OBD_MD_FLCKSUM;
998 /* 1 RC for the whole I/O */
999 ptlrpc_req_set_repsize(req, 2, size);
1002 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1003 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1005 aa->aa_requested_nob = requested_nob;
1006 aa->aa_nio_count = niocount;
1007 aa->aa_page_count = page_count;
1008 aa->aa_retries = 5; /*retry for checksum errors; lprocfs? */
1011 INIT_LIST_HEAD(&aa->aa_oaps);
1017 ptlrpc_req_finished (req);
1021 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1022 __u32 client_cksum, __u32 server_cksum, int nob,
1023 obd_count page_count, struct brw_page **pga)
1028 if (server_cksum == client_cksum) {
1029 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1033 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1035 if (new_cksum == server_cksum)
1036 msg = "changed on the client after we checksummed it";
1037 else if (new_cksum == client_cksum)
1038 msg = "changed in transit before arrival at OST";
1040 msg = "changed in transit AND doesn't match the original";
1042 LCONSOLE_ERROR("BAD WRITE CHECKSUM: %s: from %s inum "LPU64"/"LPU64
1043 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1044 msg, libcfs_nid2str(peer->nid),
1045 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1046 oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0,
1048 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1050 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1051 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1052 client_cksum, server_cksum, new_cksum);
1057 /* Note rc enters this function as number of bytes transferred */
1058 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1060 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1061 const lnet_process_id_t *peer =
1062 &req->rq_import->imp_connection->c_peer;
1063 struct client_obd *cli = aa->aa_cli;
1064 struct ost_body *body;
1065 __u32 client_cksum = 0;
1068 if (rc < 0 && rc != -EDQUOT)
1071 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1072 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1073 lustre_swab_ost_body);
1075 CERROR ("Can't unpack body\n");
1079 /* set/clear over quota flag for a uid/gid */
1080 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1081 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1082 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1083 body->oa.o_gid, body->oa.o_valid,
1089 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1090 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1092 osc_update_grant(cli, body);
1094 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1096 CERROR ("Unexpected +ve rc %d\n", rc);
1099 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1101 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1103 check_write_checksum(&body->oa, peer, client_cksum,
1105 aa->aa_requested_nob,
1110 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1111 aa->aa_page_count, aa->aa_ppga);
1115 /* The rest of this function executes only for OST_READs */
1116 if (rc > aa->aa_requested_nob) {
1117 CERROR("Unexpected rc %d (%d requested)\n", rc,
1118 aa->aa_requested_nob);
1122 if (rc != req->rq_bulk->bd_nob_transferred) {
1123 CERROR ("Unexpected rc %d (%d transferred)\n",
1124 rc, req->rq_bulk->bd_nob_transferred);
1128 if (rc < aa->aa_requested_nob)
1129 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1131 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1132 static int cksum_counter;
1133 __u32 server_cksum = body->oa.o_cksum;
1134 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1137 if (server_cksum == ~0 && rc > 0) {
1138 CERROR("Protocol error: server %s set the 'checksum' "
1139 "bit, but didn't send a checksum. Not fatal, "
1140 "but please tell CFS.\n",
1141 libcfs_nid2str(peer->nid));
1142 } else if (server_cksum != client_cksum) {
1143 LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s inum "
1144 LPU64"/"LPU64" object "LPU64"/"LPU64
1145 " extent ["LPU64"-"LPU64"]\n",
1146 req->rq_import->imp_obd->obd_name,
1147 libcfs_nid2str(peer->nid),
1148 body->oa.o_valid & OBD_MD_FLFID ?
1149 body->oa.o_fid : (__u64)0,
1150 body->oa.o_valid & OBD_MD_FLFID ?
1151 body->oa.o_generation :(__u64)0,
1153 body->oa.o_valid & OBD_MD_FLGROUP ?
1154 body->oa.o_gr : (__u64)0,
1155 aa->aa_ppga[0]->off,
1156 aa->aa_ppga[aa->aa_page_count-1]->off +
1157 aa->aa_ppga[aa->aa_page_count-1]->count -
1159 CERROR("client %x, server %x\n",
1160 client_cksum, server_cksum);
1162 aa->aa_oa->o_cksum = client_cksum;
1166 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1169 } else if (unlikely(client_cksum)) {
1170 static int cksum_missed;
1173 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1174 CERROR("Checksum %u requested from %s but not sent\n",
1175 cksum_missed, libcfs_nid2str(peer->nid));
1181 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1186 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1187 struct lov_stripe_md *lsm,
1188 obd_count page_count, struct brw_page **pga)
1190 struct ptlrpc_request *request;
1191 int rc, retries = 5; /* lprocfs? */
1195 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1196 page_count, pga, &request);
1200 rc = ptlrpc_queue_wait(request);
1202 if (rc == -ETIMEDOUT && request->rq_resend) {
1203 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1204 ptlrpc_req_finished(request);
1208 rc = osc_brw_fini_request(request, rc);
1210 ptlrpc_req_finished(request);
1211 if (rc == -EAGAIN) {
1219 int osc_brw_redo_request(struct ptlrpc_request *request,
1220 struct osc_brw_async_args *aa)
1222 struct ptlrpc_request *new_req;
1223 struct ptlrpc_request_set *set = request->rq_set;
1224 struct osc_brw_async_args *new_aa;
1225 struct osc_async_page *oap;
1229 if (aa->aa_retries-- <= 0) {
1230 CERROR("too many checksum retries, returning error\n");
1234 DEBUG_REQ(D_ERROR, request, "redo for checksum error");
1235 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1236 if (oap->oap_request != NULL) {
1237 LASSERTF(request == oap->oap_request,
1238 "request %p != oap_request %p\n",
1239 request, oap->oap_request);
1240 if (oap->oap_interrupted) {
1241 ptlrpc_mark_interrupted(oap->oap_request);
1250 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1251 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1252 aa->aa_cli, aa->aa_oa,
1253 NULL /* lsm unused by osc currently */,
1254 aa->aa_page_count, aa->aa_ppga, &new_req);
1258 /* New request takes over pga and oaps from old request.
1259 * Note that copying a list_head doesn't work, need to move it... */
1260 new_req->rq_interpret_reply = request->rq_interpret_reply;
1261 new_req->rq_async_args = request->rq_async_args;
1262 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1263 INIT_LIST_HEAD(&new_aa->aa_oaps);
1264 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1265 INIT_LIST_HEAD(&aa->aa_oaps);
1267 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1268 if (oap->oap_request) {
1269 ptlrpc_req_finished(oap->oap_request);
1270 oap->oap_request = ptlrpc_request_addref(new_req);
1274 ptlrpc_set_add_req(set, new_req);
1279 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1281 struct osc_brw_async_args *aa = data;
1285 rc = osc_brw_fini_request(request, rc);
1286 if (rc == -EAGAIN) {
1287 rc = osc_brw_redo_request(request, aa);
1292 spin_lock(&aa->aa_cli->cl_loi_list_lock);
1293 for (i = 0; i < aa->aa_page_count; i++)
1294 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1295 spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1297 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1302 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1303 struct lov_stripe_md *lsm, obd_count page_count,
1304 struct brw_page **pga, struct ptlrpc_request_set *set)
1306 struct ptlrpc_request *request;
1307 struct client_obd *cli = &exp->exp_obd->u.cli;
1311 /* Consume write credits even if doing a sync write -
1312 * otherwise we may run out of space on OST due to grant. */
1313 spin_lock(&cli->cl_loi_list_lock);
1314 for (i = 0; i < page_count; i++) {
1315 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1316 osc_consume_write_grant(cli, pga[i]);
1318 spin_unlock(&cli->cl_loi_list_lock);
1320 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1321 page_count, pga, &request);
1324 request->rq_interpret_reply = brw_interpret;
1325 ptlrpc_set_add_req(set, request);
1327 spin_lock(&cli->cl_loi_list_lock);
1328 for (i = 0; i < page_count; i++)
1329 osc_release_write_grant(cli, pga[i], 0);
1330 spin_unlock(&cli->cl_loi_list_lock);
1337 * ugh, we want disk allocation on the target to happen in offset order. we'll
1338 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1339 * fine for our small page arrays and doesn't require allocation. its an
1340 * insertion sort that swaps elements that are strides apart, shrinking the
1341 * stride down until its '1' and the array is sorted.
1343 static void sort_brw_pages(struct brw_page **array, int num)
1346 struct brw_page *tmp;
1350 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1355 for (i = stride ; i < num ; i++) {
1358 while (j >= stride && array[j-stride]->off > tmp->off) {
1359 array[j] = array[j - stride];
1364 } while (stride > 1);
1367 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1373 LASSERT (pages > 0);
1374 offset = pg[i]->off & (~CFS_PAGE_MASK);
1378 if (pages == 0) /* that's all */
1381 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1382 return count; /* doesn't end on page boundary */
1385 offset = pg[i]->off & (~CFS_PAGE_MASK);
1386 if (offset != 0) /* doesn't start on page boundary */
1393 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1395 struct brw_page **ppga;
1398 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1402 for (i = 0; i < count; i++)
1407 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1409 LASSERT(ppga != NULL);
1410 OBD_FREE(ppga, sizeof(*ppga) * count);
1413 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1414 obd_count page_count, struct brw_page *pga,
1415 struct obd_trans_info *oti)
1417 struct obdo *saved_oa = NULL;
1418 struct brw_page **ppga, **orig;
1419 struct obd_import *imp = class_exp2cliimp(exp);
1420 struct client_obd *cli = &imp->imp_obd->u.cli;
1421 int rc, page_count_orig;
1424 if (cmd & OBD_BRW_CHECK) {
1425 /* The caller just wants to know if there's a chance that this
1426 * I/O can succeed */
1428 if (imp == NULL || imp->imp_invalid)
1433 /* test_brw with a failed create can trip this, maybe others. */
1434 LASSERT(cli->cl_max_pages_per_rpc);
1438 orig = ppga = osc_build_ppga(pga, page_count);
1441 page_count_orig = page_count;
1443 sort_brw_pages(ppga, page_count);
1444 while (page_count) {
1445 obd_count pages_per_brw;
1447 if (page_count > cli->cl_max_pages_per_rpc)
1448 pages_per_brw = cli->cl_max_pages_per_rpc;
1450 pages_per_brw = page_count;
1452 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1454 if (saved_oa != NULL) {
1455 /* restore previously saved oa */
1456 *oinfo->oi_oa = *saved_oa;
1457 } else if (page_count > pages_per_brw) {
1458 /* save a copy of oa (brw will clobber it) */
1459 saved_oa = obdo_alloc();
1460 if (saved_oa == NULL)
1461 GOTO(out, rc = -ENOMEM);
1462 *saved_oa = *oinfo->oi_oa;
1465 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1466 pages_per_brw, ppga);
1471 page_count -= pages_per_brw;
1472 ppga += pages_per_brw;
1476 osc_release_ppga(orig, page_count_orig);
1478 if (saved_oa != NULL)
1479 obdo_free(saved_oa);
1484 static int osc_brw_async(int cmd, struct obd_export *exp,
1485 struct obd_info *oinfo, obd_count page_count,
1486 struct brw_page *pga, struct obd_trans_info *oti,
1487 struct ptlrpc_request_set *set)
1489 struct brw_page **ppga, **orig;
1490 int page_count_orig;
1494 if (cmd & OBD_BRW_CHECK) {
1495 /* The caller just wants to know if there's a chance that this
1496 * I/O can succeed */
1497 struct obd_import *imp = class_exp2cliimp(exp);
1499 if (imp == NULL || imp->imp_invalid)
1504 orig = ppga = osc_build_ppga(pga, page_count);
1507 page_count_orig = page_count;
1509 sort_brw_pages(ppga, page_count);
1510 while (page_count) {
1511 struct brw_page **copy;
1512 obd_count pages_per_brw;
1514 pages_per_brw = min_t(obd_count, page_count,
1515 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1517 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1519 /* use ppga only if single RPC is going to fly */
1520 if (pages_per_brw != page_count_orig || ppga != orig) {
1521 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1523 GOTO(out, rc = -ENOMEM);
1524 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1528 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1529 pages_per_brw, copy, set);
1533 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1538 /* we passed it to async_internal() which is
1539 * now responsible for releasing memory */
1543 page_count -= pages_per_brw;
1544 ppga += pages_per_brw;
1548 osc_release_ppga(orig, page_count_orig);
1552 static void osc_check_rpcs(struct client_obd *cli);
1554 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1555 * the dirty accounting. Writeback completes or truncate happens before
1556 * writing starts. Must be called with the loi lock held. */
1557 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1560 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1563 /* This maintains the lists of pending pages to read/write for a given object
1564 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1565 * to quickly find objects that are ready to send an RPC. */
1566 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1572 if (lop->lop_num_pending == 0)
1575 /* if we have an invalid import we want to drain the queued pages
1576 * by forcing them through rpcs that immediately fail and complete
1577 * the pages. recovery relies on this to empty the queued pages
1578 * before canceling the locks and evicting down the llite pages */
1579 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1582 /* stream rpcs in queue order as long as as there is an urgent page
1583 * queued. this is our cheap solution for good batching in the case
1584 * where writepage marks some random page in the middle of the file
1585 * as urgent because of, say, memory pressure */
1586 if (!list_empty(&lop->lop_urgent)) {
1587 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1591 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1592 optimal = cli->cl_max_pages_per_rpc;
1593 if (cmd & OBD_BRW_WRITE) {
1594 /* trigger a write rpc stream as long as there are dirtiers
1595 * waiting for space. as they're waiting, they're not going to
1596 * create more pages to coallesce with what's waiting.. */
1597 if (!list_empty(&cli->cl_cache_waiters)) {
1598 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1602 /* +16 to avoid triggering rpcs that would want to include pages
1603 * that are being queued but which can't be made ready until
1604 * the queuer finishes with the page. this is a wart for
1605 * llite::commit_write() */
1608 if (lop->lop_num_pending >= optimal)
1614 static void on_list(struct list_head *item, struct list_head *list,
1617 if (list_empty(item) && should_be_on)
1618 list_add_tail(item, list);
1619 else if (!list_empty(item) && !should_be_on)
1620 list_del_init(item);
1623 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1624 * can find pages to build into rpcs quickly */
1625 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1627 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1628 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1629 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1631 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1632 loi->loi_write_lop.lop_num_pending);
1634 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1635 loi->loi_read_lop.lop_num_pending);
1638 static void lop_update_pending(struct client_obd *cli,
1639 struct loi_oap_pages *lop, int cmd, int delta)
1641 lop->lop_num_pending += delta;
1642 if (cmd & OBD_BRW_WRITE)
1643 cli->cl_pending_w_pages += delta;
1645 cli->cl_pending_r_pages += delta;
1648 /* this is called when a sync waiter receives an interruption. Its job is to
1649 * get the caller woken as soon as possible. If its page hasn't been put in an
1650 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1651 * desiring interruption which will forcefully complete the rpc once the rpc
1653 static void osc_occ_interrupted(struct oig_callback_context *occ)
1655 struct osc_async_page *oap;
1656 struct loi_oap_pages *lop;
1657 struct lov_oinfo *loi;
1660 /* XXX member_of() */
1661 oap = list_entry(occ, struct osc_async_page, oap_occ);
1663 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1665 oap->oap_interrupted = 1;
1667 /* ok, it's been put in an rpc. only one oap gets a request reference */
1668 if (oap->oap_request != NULL) {
1669 ptlrpc_mark_interrupted(oap->oap_request);
1670 ptlrpcd_wake(oap->oap_request);
1674 /* we don't get interruption callbacks until osc_trigger_group_io()
1675 * has been called and put the sync oaps in the pending/urgent lists.*/
1676 if (!list_empty(&oap->oap_pending_item)) {
1677 list_del_init(&oap->oap_pending_item);
1678 list_del_init(&oap->oap_urgent_item);
1681 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1682 &loi->loi_write_lop : &loi->loi_read_lop;
1683 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1684 loi_list_maint(oap->oap_cli, oap->oap_loi);
1686 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1687 oap->oap_oig = NULL;
1691 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1694 /* this is trying to propogate async writeback errors back up to the
1695 * application. As an async write fails we record the error code for later if
1696 * the app does an fsync. As long as errors persist we force future rpcs to be
1697 * sync so that the app can get a sync error and break the cycle of queueing
1698 * pages for which writeback will fail. */
1699 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1706 ar->ar_force_sync = 1;
1707 ar->ar_min_xid = ptlrpc_sample_next_xid();
1712 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1713 ar->ar_force_sync = 0;
1716 static void osc_oap_to_pending(struct osc_async_page *oap)
1718 struct loi_oap_pages *lop;
1720 if (oap->oap_cmd & OBD_BRW_WRITE)
1721 lop = &oap->oap_loi->loi_write_lop;
1723 lop = &oap->oap_loi->loi_read_lop;
1725 if (oap->oap_async_flags & ASYNC_URGENT)
1726 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1727 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1728 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1731 /* this must be called holding the loi list lock to give coverage to exit_cache,
1732 * async_flag maintenance, and oap_request */
1733 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1734 struct osc_async_page *oap, int sent, int rc)
1737 oap->oap_async_flags = 0;
1738 oap->oap_interrupted = 0;
1740 if (oap->oap_cmd & OBD_BRW_WRITE) {
1741 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1742 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1745 if (oap->oap_request != NULL) {
1746 ptlrpc_req_finished(oap->oap_request);
1747 oap->oap_request = NULL;
1750 if (rc == 0 && oa != NULL) {
1751 if (oa->o_valid & OBD_MD_FLBLOCKS)
1752 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1753 if (oa->o_valid & OBD_MD_FLMTIME)
1754 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1755 if (oa->o_valid & OBD_MD_FLATIME)
1756 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1757 if (oa->o_valid & OBD_MD_FLCTIME)
1758 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1762 osc_exit_cache(cli, oap, sent);
1763 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1764 oap->oap_oig = NULL;
1769 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1770 oap->oap_cmd, oa, rc);
1772 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1773 * I/O on the page could start, but OSC calls it under lock
1774 * and thus we can add oap back to pending safely */
1776 /* upper layer wants to leave the page on pending queue */
1777 osc_oap_to_pending(oap);
1779 osc_exit_cache(cli, oap, sent);
1783 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1785 struct osc_brw_async_args *aa = data;
1786 struct osc_async_page *oap, *tmp;
1787 struct client_obd *cli;
1790 rc = osc_brw_fini_request(request, rc);
1791 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1792 if (rc == -EAGAIN) {
1793 rc = osc_brw_redo_request(request, aa);
1801 client_obd_list_lock(&cli->cl_loi_list_lock);
1803 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1804 * is called so we know whether to go to sync BRWs or wait for more
1805 * RPCs to complete */
1806 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1807 cli->cl_w_in_flight--;
1809 cli->cl_r_in_flight--;
1811 /* the caller may re-use the oap after the completion call so
1812 * we need to clean it up a little */
1813 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1814 list_del_init(&oap->oap_rpc_item);
1815 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1818 osc_wake_cache_waiters(cli);
1819 osc_check_rpcs(cli);
1821 client_obd_list_unlock(&cli->cl_loi_list_lock);
1823 obdo_free(aa->aa_oa);
1827 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1831 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1832 struct list_head *rpc_list,
1833 int page_count, int cmd)
1835 struct ptlrpc_request *req;
1836 struct brw_page **pga = NULL;
1837 struct osc_brw_async_args *aa;
1838 struct obdo *oa = NULL;
1839 struct obd_async_page_ops *ops = NULL;
1840 void *caller_data = NULL;
1841 struct osc_async_page *oap;
1845 LASSERT(!list_empty(rpc_list));
1847 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1849 RETURN(ERR_PTR(-ENOMEM));
1853 GOTO(out, req = ERR_PTR(-ENOMEM));
1856 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1858 ops = oap->oap_caller_ops;
1859 caller_data = oap->oap_caller_data;
1861 pga[i] = &oap->oap_brw_page;
1862 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1863 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1864 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1868 /* always get the data for the obdo for the rpc */
1869 LASSERT(ops != NULL);
1870 ops->ap_fill_obdo(caller_data, cmd, oa);
1872 sort_brw_pages(pga, page_count);
1873 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1875 CERROR("prep_req failed: %d\n", rc);
1876 GOTO(out, req = ERR_PTR(rc));
1879 /* Need to update the timestamps after the request is built in case
1880 * we race with setattr (locally or in queue at OST). If OST gets
1881 * later setattr before earlier BRW (as determined by the request xid),
1882 * the OST will not use BRW timestamps. Sadly, there is no obvious
1883 * way to do this in a single call. bug 10150 */
1884 ops->ap_update_obdo(caller_data, cmd, oa,
1885 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1887 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1888 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1889 INIT_LIST_HEAD(&aa->aa_oaps);
1890 list_splice(rpc_list, &aa->aa_oaps);
1891 INIT_LIST_HEAD(rpc_list);
1898 OBD_FREE(pga, sizeof(*pga) * page_count);
1903 /* the loi lock is held across this function but it's allowed to release
1904 * and reacquire it during its work */
1905 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1906 int cmd, struct loi_oap_pages *lop)
1908 struct ptlrpc_request *req;
1909 obd_count page_count = 0;
1910 struct osc_async_page *oap = NULL, *tmp;
1911 struct osc_brw_async_args *aa;
1912 struct obd_async_page_ops *ops;
1913 CFS_LIST_HEAD(rpc_list);
1914 unsigned int ending_offset;
1915 unsigned starting_offset = 0;
1918 /* first we find the pages we're allowed to work with */
1919 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1920 ops = oap->oap_caller_ops;
1922 LASSERT(oap->oap_magic == OAP_MAGIC);
1924 /* in llite being 'ready' equates to the page being locked
1925 * until completion unlocks it. commit_write submits a page
1926 * as not ready because its unlock will happen unconditionally
1927 * as the call returns. if we race with commit_write giving
1928 * us that page we dont' want to create a hole in the page
1929 * stream, so we stop and leave the rpc to be fired by
1930 * another dirtier or kupdated interval (the not ready page
1931 * will still be on the dirty list). we could call in
1932 * at the end of ll_file_write to process the queue again. */
1933 if (!(oap->oap_async_flags & ASYNC_READY)) {
1934 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1936 CDEBUG(D_INODE, "oap %p page %p returned %d "
1937 "instead of ready\n", oap,
1941 /* llite is telling us that the page is still
1942 * in commit_write and that we should try
1943 * and put it in an rpc again later. we
1944 * break out of the loop so we don't create
1945 * a hole in the sequence of pages in the rpc
1950 /* the io isn't needed.. tell the checks
1951 * below to complete the rpc with EINTR */
1952 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1953 oap->oap_count = -EINTR;
1956 oap->oap_async_flags |= ASYNC_READY;
1959 LASSERTF(0, "oap %p page %p returned %d "
1960 "from make_ready\n", oap,
1968 * Page submitted for IO has to be locked. Either by
1969 * ->ap_make_ready() or by higher layers.
1971 * XXX nikita: this assertion should be adjusted when lustre
1972 * starts using PG_writeback for pages being written out.
1974 #if defined(__KERNEL__) && defined(__LINUX__)
1975 LASSERT(PageLocked(oap->oap_page));
1977 /* If there is a gap at the start of this page, it can't merge
1978 * with any previous page, so we'll hand the network a
1979 * "fragmented" page array that it can't transfer in 1 RDMA */
1980 if (page_count != 0 && oap->oap_page_off != 0)
1983 /* take the page out of our book-keeping */
1984 list_del_init(&oap->oap_pending_item);
1985 lop_update_pending(cli, lop, cmd, -1);
1986 list_del_init(&oap->oap_urgent_item);
1988 if (page_count == 0)
1989 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1990 (PTLRPC_MAX_BRW_SIZE - 1);
1992 /* ask the caller for the size of the io as the rpc leaves. */
1993 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1995 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1996 if (oap->oap_count <= 0) {
1997 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1999 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2003 /* now put the page back in our accounting */
2004 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2005 if (++page_count >= cli->cl_max_pages_per_rpc)
2008 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2009 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2010 * have the same alignment as the initial writes that allocated
2011 * extents on the server. */
2012 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2013 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2014 if (ending_offset == 0)
2017 /* If there is a gap at the end of this page, it can't merge
2018 * with any subsequent pages, so we'll hand the network a
2019 * "fragmented" page array that it can't transfer in 1 RDMA */
2020 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2024 osc_wake_cache_waiters(cli);
2026 if (page_count == 0)
2029 loi_list_maint(cli, loi);
2031 client_obd_list_unlock(&cli->cl_loi_list_lock);
2033 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2035 /* this should happen rarely and is pretty bad, it makes the
2036 * pending list not follow the dirty order */
2037 client_obd_list_lock(&cli->cl_loi_list_lock);
2038 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2039 list_del_init(&oap->oap_rpc_item);
2041 /* queued sync pages can be torn down while the pages
2042 * were between the pending list and the rpc */
2043 if (oap->oap_interrupted) {
2044 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2045 osc_ap_completion(cli, NULL, oap, 0,
2049 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2051 loi_list_maint(cli, loi);
2052 RETURN(PTR_ERR(req));
2055 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2056 if (cmd == OBD_BRW_READ) {
2057 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2058 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2059 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2060 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2061 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2063 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2064 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2065 cli->cl_w_in_flight);
2066 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2067 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2068 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2071 client_obd_list_lock(&cli->cl_loi_list_lock);
2073 if (cmd == OBD_BRW_READ)
2074 cli->cl_r_in_flight++;
2076 cli->cl_w_in_flight++;
2078 /* queued sync pages can be torn down while the pages
2079 * were between the pending list and the rpc */
2081 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2082 /* only one oap gets a request reference */
2085 if (oap->oap_interrupted && !req->rq_intr) {
2086 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2088 ptlrpc_mark_interrupted(req);
2092 tmp->oap_request = ptlrpc_request_addref(req);
2094 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2095 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2097 req->rq_interpret_reply = brw_interpret_oap;
2098 ptlrpcd_add_req(req);
2102 #define LOI_DEBUG(LOI, STR, args...) \
2103 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2104 !list_empty(&(LOI)->loi_cli_item), \
2105 (LOI)->loi_write_lop.lop_num_pending, \
2106 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2107 (LOI)->loi_read_lop.lop_num_pending, \
2108 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2111 /* This is called by osc_check_rpcs() to find which objects have pages that
2112 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2113 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2116 /* first return all objects which we already know to have
2117 * pages ready to be stuffed into rpcs */
2118 if (!list_empty(&cli->cl_loi_ready_list))
2119 RETURN(list_entry(cli->cl_loi_ready_list.next,
2120 struct lov_oinfo, loi_cli_item));
2122 /* then if we have cache waiters, return all objects with queued
2123 * writes. This is especially important when many small files
2124 * have filled up the cache and not been fired into rpcs because
2125 * they don't pass the nr_pending/object threshhold */
2126 if (!list_empty(&cli->cl_cache_waiters) &&
2127 !list_empty(&cli->cl_loi_write_list))
2128 RETURN(list_entry(cli->cl_loi_write_list.next,
2129 struct lov_oinfo, loi_write_item));
2131 /* then return all queued objects when we have an invalid import
2132 * so that they get flushed */
2133 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2134 if (!list_empty(&cli->cl_loi_write_list))
2135 RETURN(list_entry(cli->cl_loi_write_list.next,
2136 struct lov_oinfo, loi_write_item));
2137 if (!list_empty(&cli->cl_loi_read_list))
2138 RETURN(list_entry(cli->cl_loi_read_list.next,
2139 struct lov_oinfo, loi_read_item));
2144 /* called with the loi list lock held */
2145 static void osc_check_rpcs(struct client_obd *cli)
2147 struct lov_oinfo *loi;
2148 int rc = 0, race_counter = 0;
2151 while ((loi = osc_next_loi(cli)) != NULL) {
2152 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2154 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2157 /* attempt some read/write balancing by alternating between
2158 * reads and writes in an object. The makes_rpc checks here
2159 * would be redundant if we were getting read/write work items
2160 * instead of objects. we don't want send_oap_rpc to drain a
2161 * partial read pending queue when we're given this object to
2162 * do io on writes while there are cache waiters */
2163 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2164 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2165 &loi->loi_write_lop);
2173 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2174 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2175 &loi->loi_read_lop);
2184 /* attempt some inter-object balancing by issueing rpcs
2185 * for each object in turn */
2186 if (!list_empty(&loi->loi_cli_item))
2187 list_del_init(&loi->loi_cli_item);
2188 if (!list_empty(&loi->loi_write_item))
2189 list_del_init(&loi->loi_write_item);
2190 if (!list_empty(&loi->loi_read_item))
2191 list_del_init(&loi->loi_read_item);
2193 loi_list_maint(cli, loi);
2195 /* send_oap_rpc fails with 0 when make_ready tells it to
2196 * back off. llite's make_ready does this when it tries
2197 * to lock a page queued for write that is already locked.
2198 * we want to try sending rpcs from many objects, but we
2199 * don't want to spin failing with 0. */
2200 if (race_counter == 10)
2206 /* we're trying to queue a page in the osc so we're subject to the
2207 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2208 * If the osc's queued pages are already at that limit, then we want to sleep
2209 * until there is space in the osc's queue for us. We also may be waiting for
2210 * write credits from the OST if there are RPCs in flight that may return some
2211 * before we fall back to sync writes.
2213 * We need this know our allocation was granted in the presence of signals */
2214 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2218 client_obd_list_lock(&cli->cl_loi_list_lock);
2219 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2220 client_obd_list_unlock(&cli->cl_loi_list_lock);
2224 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2225 * grant or cache space. */
2226 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2227 struct osc_async_page *oap)
2229 struct osc_cache_waiter ocw;
2230 struct l_wait_info lwi = { 0 };
2233 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2234 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2235 cli->cl_dirty_max, obd_max_dirty_pages,
2236 cli->cl_lost_grant, cli->cl_avail_grant);
2238 /* force the caller to try sync io. this can jump the list
2239 * of queued writes and create a discontiguous rpc stream */
2240 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2241 loi->loi_ar.ar_force_sync)
2244 /* Hopefully normal case - cache space and write credits available */
2245 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2246 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2247 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2248 /* account for ourselves */
2249 osc_consume_write_grant(cli, &oap->oap_brw_page);
2253 /* Make sure that there are write rpcs in flight to wait for. This
2254 * is a little silly as this object may not have any pending but
2255 * other objects sure might. */
2256 if (cli->cl_w_in_flight) {
2257 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2258 cfs_waitq_init(&ocw.ocw_waitq);
2262 loi_list_maint(cli, loi);
2263 osc_check_rpcs(cli);
2264 client_obd_list_unlock(&cli->cl_loi_list_lock);
2266 CDEBUG(D_CACHE, "sleeping for cache space\n");
2267 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2269 client_obd_list_lock(&cli->cl_loi_list_lock);
2270 if (!list_empty(&ocw.ocw_entry)) {
2271 list_del(&ocw.ocw_entry);
2280 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2281 struct lov_oinfo *loi, cfs_page_t *page,
2282 obd_off offset, struct obd_async_page_ops *ops,
2283 void *data, void **res)
2285 struct osc_async_page *oap;
2289 return size_round(sizeof(*oap));
2292 oap->oap_magic = OAP_MAGIC;
2293 oap->oap_cli = &exp->exp_obd->u.cli;
2296 oap->oap_caller_ops = ops;
2297 oap->oap_caller_data = data;
2299 oap->oap_page = page;
2300 oap->oap_obj_off = offset;
2302 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2303 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2304 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2306 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2308 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2312 struct osc_async_page *oap_from_cookie(void *cookie)
2314 struct osc_async_page *oap = cookie;
2315 if (oap->oap_magic != OAP_MAGIC)
2316 return ERR_PTR(-EINVAL);
2320 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2321 struct lov_oinfo *loi, void *cookie,
2322 int cmd, obd_off off, int count,
2323 obd_flag brw_flags, enum async_flags async_flags)
2325 struct client_obd *cli = &exp->exp_obd->u.cli;
2326 struct osc_async_page *oap;
2330 oap = oap_from_cookie(cookie);
2332 RETURN(PTR_ERR(oap));
2334 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2337 if (!list_empty(&oap->oap_pending_item) ||
2338 !list_empty(&oap->oap_urgent_item) ||
2339 !list_empty(&oap->oap_rpc_item))
2342 /* check if the file's owner/group is over quota */
2343 #ifdef HAVE_QUOTA_SUPPORT
2344 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2345 struct obd_async_page_ops *ops;
2352 ops = oap->oap_caller_ops;
2353 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2354 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2365 loi = lsm->lsm_oinfo[0];
2367 client_obd_list_lock(&cli->cl_loi_list_lock);
2370 oap->oap_page_off = off;
2371 oap->oap_count = count;
2372 oap->oap_brw_flags = brw_flags;
2373 oap->oap_async_flags = async_flags;
2375 if (cmd & OBD_BRW_WRITE) {
2376 rc = osc_enter_cache(cli, loi, oap);
2378 client_obd_list_unlock(&cli->cl_loi_list_lock);
2383 osc_oap_to_pending(oap);
2384 loi_list_maint(cli, loi);
2386 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2389 osc_check_rpcs(cli);
2390 client_obd_list_unlock(&cli->cl_loi_list_lock);
2395 /* aka (~was & now & flag), but this is more clear :) */
2396 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2398 static int osc_set_async_flags(struct obd_export *exp,
2399 struct lov_stripe_md *lsm,
2400 struct lov_oinfo *loi, void *cookie,
2401 obd_flag async_flags)
2403 struct client_obd *cli = &exp->exp_obd->u.cli;
2404 struct loi_oap_pages *lop;
2405 struct osc_async_page *oap;
2409 oap = oap_from_cookie(cookie);
2411 RETURN(PTR_ERR(oap));
2414 * bug 7311: OST-side locking is only supported for liblustre for now
2415 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2416 * implementation has to handle case where OST-locked page was picked
2417 * up by, e.g., ->writepage().
2419 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2420 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2423 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2427 loi = lsm->lsm_oinfo[0];
2429 if (oap->oap_cmd & OBD_BRW_WRITE) {
2430 lop = &loi->loi_write_lop;
2432 lop = &loi->loi_read_lop;
2435 client_obd_list_lock(&cli->cl_loi_list_lock);
2437 if (list_empty(&oap->oap_pending_item))
2438 GOTO(out, rc = -EINVAL);
2440 if ((oap->oap_async_flags & async_flags) == async_flags)
2443 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2444 oap->oap_async_flags |= ASYNC_READY;
2446 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2447 if (list_empty(&oap->oap_rpc_item)) {
2448 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2449 loi_list_maint(cli, loi);
2453 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2454 oap->oap_async_flags);
2456 osc_check_rpcs(cli);
2457 client_obd_list_unlock(&cli->cl_loi_list_lock);
2461 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2462 struct lov_oinfo *loi,
2463 struct obd_io_group *oig, void *cookie,
2464 int cmd, obd_off off, int count,
2466 obd_flag async_flags)
2468 struct client_obd *cli = &exp->exp_obd->u.cli;
2469 struct osc_async_page *oap;
2470 struct loi_oap_pages *lop;
2474 oap = oap_from_cookie(cookie);
2476 RETURN(PTR_ERR(oap));
2478 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2481 if (!list_empty(&oap->oap_pending_item) ||
2482 !list_empty(&oap->oap_urgent_item) ||
2483 !list_empty(&oap->oap_rpc_item))
2487 loi = lsm->lsm_oinfo[0];
2489 client_obd_list_lock(&cli->cl_loi_list_lock);
2492 oap->oap_page_off = off;
2493 oap->oap_count = count;
2494 oap->oap_brw_flags = brw_flags;
2495 oap->oap_async_flags = async_flags;
2497 if (cmd & OBD_BRW_WRITE)
2498 lop = &loi->loi_write_lop;
2500 lop = &loi->loi_read_lop;
2502 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2503 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2505 rc = oig_add_one(oig, &oap->oap_occ);
2508 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2509 oap, oap->oap_page, rc);
2511 client_obd_list_unlock(&cli->cl_loi_list_lock);
2516 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2517 struct loi_oap_pages *lop, int cmd)
2519 struct list_head *pos, *tmp;
2520 struct osc_async_page *oap;
2522 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2523 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2524 list_del(&oap->oap_pending_item);
2525 osc_oap_to_pending(oap);
2527 loi_list_maint(cli, loi);
2530 static int osc_trigger_group_io(struct obd_export *exp,
2531 struct lov_stripe_md *lsm,
2532 struct lov_oinfo *loi,
2533 struct obd_io_group *oig)
2535 struct client_obd *cli = &exp->exp_obd->u.cli;
2539 loi = lsm->lsm_oinfo[0];
2541 client_obd_list_lock(&cli->cl_loi_list_lock);
2543 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2544 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2546 osc_check_rpcs(cli);
2547 client_obd_list_unlock(&cli->cl_loi_list_lock);
2552 static int osc_teardown_async_page(struct obd_export *exp,
2553 struct lov_stripe_md *lsm,
2554 struct lov_oinfo *loi, void *cookie)
2556 struct client_obd *cli = &exp->exp_obd->u.cli;
2557 struct loi_oap_pages *lop;
2558 struct osc_async_page *oap;
2562 oap = oap_from_cookie(cookie);
2564 RETURN(PTR_ERR(oap));
2567 loi = lsm->lsm_oinfo[0];
2569 if (oap->oap_cmd & OBD_BRW_WRITE) {
2570 lop = &loi->loi_write_lop;
2572 lop = &loi->loi_read_lop;
2575 client_obd_list_lock(&cli->cl_loi_list_lock);
2577 if (!list_empty(&oap->oap_rpc_item))
2578 GOTO(out, rc = -EBUSY);
2580 osc_exit_cache(cli, oap, 0);
2581 osc_wake_cache_waiters(cli);
2583 if (!list_empty(&oap->oap_urgent_item)) {
2584 list_del_init(&oap->oap_urgent_item);
2585 oap->oap_async_flags &= ~ASYNC_URGENT;
2587 if (!list_empty(&oap->oap_pending_item)) {
2588 list_del_init(&oap->oap_pending_item);
2589 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2591 loi_list_maint(cli, loi);
2593 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2595 client_obd_list_unlock(&cli->cl_loi_list_lock);
2599 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2602 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2605 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2608 lock_res_and_lock(lock);
2611 /* Liang XXX: Darwin and Winnt checking should be added */
2612 if (lock->l_ast_data && lock->l_ast_data != data) {
2613 struct inode *new_inode = data;
2614 struct inode *old_inode = lock->l_ast_data;
2615 if (!(old_inode->i_state & I_FREEING))
2616 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2617 LASSERTF(old_inode->i_state & I_FREEING,
2618 "Found existing inode %p/%lu/%u state %lu in lock: "
2619 "setting data to %p/%lu/%u\n", old_inode,
2620 old_inode->i_ino, old_inode->i_generation,
2622 new_inode, new_inode->i_ino, new_inode->i_generation);
2626 lock->l_ast_data = data;
2627 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2628 unlock_res_and_lock(lock);
2629 LDLM_LOCK_PUT(lock);
2632 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2633 ldlm_iterator_t replace, void *data)
2635 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2636 struct obd_device *obd = class_exp2obd(exp);
2638 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2642 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2648 /* The request was created before ldlm_cli_enqueue call. */
2649 if (rc == ELDLM_LOCK_ABORTED) {
2650 struct ldlm_reply *rep;
2652 /* swabbed by ldlm_cli_enqueue() */
2653 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2654 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2656 LASSERT(rep != NULL);
2657 if (rep->lock_policy_res1)
2658 rc = rep->lock_policy_res1;
2662 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2663 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2664 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2665 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2666 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2669 /* Call the update callback. */
2670 rc = oinfo->oi_cb_up(oinfo, rc);
2674 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2675 struct osc_enqueue_args *aa, int rc)
2677 int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2678 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2679 struct ldlm_lock *lock;
2681 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2683 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2685 /* Complete obtaining the lock procedure. */
2686 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2688 &aa->oa_ei->ei_flags,
2689 &lsm->lsm_oinfo[0]->loi_lvb,
2690 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2691 lustre_swab_ost_lvb,
2692 aa->oa_oi->oi_lockh, rc);
2694 /* Complete osc stuff. */
2695 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2697 /* Release the lock for async request. */
2698 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2699 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2701 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2702 aa->oa_oi->oi_lockh, req, aa);
2703 LDLM_LOCK_PUT(lock);
2707 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2708 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2709 * other synchronous requests, however keeping some locks and trying to obtain
2710 * others may take a considerable amount of time in a case of ost failure; and
2711 * when other sync requests do not get released lock from a client, the client
2712 * is excluded from the cluster -- such scenarious make the life difficult, so
2713 * release locks just after they are obtained. */
2714 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2715 struct obd_enqueue_info *einfo)
2717 struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2718 struct obd_device *obd = exp->exp_obd;
2719 struct ldlm_reply *rep;
2720 struct ptlrpc_request *req = NULL;
2721 int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2725 /* Filesystem lock extents are extended to page boundaries so that
2726 * dealing with the page cache is a little smoother. */
2727 oinfo->oi_policy.l_extent.start -=
2728 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2729 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2731 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2734 /* Next, search for already existing extent locks that will cover us */
2735 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2736 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2739 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2742 /* I would like to be able to ASSERT here that rss <=
2743 * kms, but I can't, for reasons which are explained in
2747 /* We already have a lock, and it's referenced */
2748 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2750 /* For async requests, decref the lock. */
2751 if (einfo->ei_rqset)
2752 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2757 /* If we're trying to read, we also search for an existing PW lock. The
2758 * VFS and page cache already protect us locally, so lots of readers/
2759 * writers can share a single PW lock.
2761 * There are problems with conversion deadlocks, so instead of
2762 * converting a read lock to a write lock, we'll just enqueue a new
2765 * At some point we should cancel the read lock instead of making them
2766 * send us a blocking callback, but there are problems with canceling
2767 * locks out from other users right now, too. */
2769 if (einfo->ei_mode == LCK_PR) {
2770 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY,
2771 &res_id, einfo->ei_type, &oinfo->oi_policy,
2772 LCK_PW, oinfo->oi_lockh);
2774 /* FIXME: This is not incredibly elegant, but it might
2775 * be more elegant than adding another parameter to
2776 * lock_match. I want a second opinion. */
2777 /* addref the lock only if not async requests. */
2778 if (!einfo->ei_rqset)
2779 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2780 osc_set_data_with_check(oinfo->oi_lockh,
2783 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2784 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2792 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2793 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
2795 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2796 LDLM_ENQUEUE, 2, size, NULL);
2800 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2801 size[DLM_REPLY_REC_OFF] =
2802 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2803 ptlrpc_req_set_repsize(req, 3, size);
2806 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2807 einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2809 rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2810 &oinfo->oi_policy, einfo->ei_mode,
2811 &einfo->ei_flags, einfo->ei_cb_bl,
2812 einfo->ei_cb_cp, einfo->ei_cb_gl,
2814 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2815 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2816 lustre_swab_ost_lvb, oinfo->oi_lockh,
2817 einfo->ei_rqset ? 1 : 0);
2818 if (einfo->ei_rqset) {
2820 struct osc_enqueue_args *aa;
2821 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2822 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2827 req->rq_interpret_reply = osc_enqueue_interpret;
2828 ptlrpc_set_add_req(einfo->ei_rqset, req);
2829 } else if (intent) {
2830 ptlrpc_req_finished(req);
2835 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2837 ptlrpc_req_finished(req);
2842 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2843 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2844 int *flags, void *data, struct lustre_handle *lockh)
2846 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2847 struct obd_device *obd = exp->exp_obd;
2849 int lflags = *flags;
2852 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2854 /* Filesystem lock extents are extended to page boundaries so that
2855 * dealing with the page cache is a little smoother */
2856 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2857 policy->l_extent.end |= ~CFS_PAGE_MASK;
2859 /* Next, search for already existing extent locks that will cover us */
2860 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2861 policy, mode, lockh);
2863 //if (!(*flags & LDLM_FL_TEST_LOCK))
2864 osc_set_data_with_check(lockh, data, lflags);
2867 /* If we're trying to read, we also search for an existing PW lock. The
2868 * VFS and page cache already protect us locally, so lots of readers/
2869 * writers can share a single PW lock. */
2870 if (mode == LCK_PR) {
2871 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2873 policy, LCK_PW, lockh);
2874 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2875 /* FIXME: This is not incredibly elegant, but it might
2876 * be more elegant than adding another parameter to
2877 * lock_match. I want a second opinion. */
2878 osc_set_data_with_check(lockh, data, lflags);
2879 ldlm_lock_addref(lockh, LCK_PR);
2880 ldlm_lock_decref(lockh, LCK_PW);
2886 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2887 __u32 mode, struct lustre_handle *lockh)
2891 if (unlikely(mode == LCK_GROUP))
2892 ldlm_lock_decref_and_cancel(lockh, mode);
2894 ldlm_lock_decref(lockh, mode);
2899 static int osc_cancel_unused(struct obd_export *exp,
2900 struct lov_stripe_md *lsm, int flags, void *opaque)
2902 struct obd_device *obd = class_exp2obd(exp);
2903 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2905 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2909 static int osc_join_lru(struct obd_export *exp,
2910 struct lov_stripe_md *lsm, int join)
2912 struct obd_device *obd = class_exp2obd(exp);
2913 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2915 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2918 static int osc_statfs_interpret(struct ptlrpc_request *req,
2919 struct osc_async_args *aa, int rc)
2921 struct obd_statfs *msfs;
2927 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2928 lustre_swab_obd_statfs);
2930 CERROR("Can't unpack obd_statfs\n");
2931 GOTO(out, rc = -EPROTO);
2934 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2936 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2940 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2941 __u64 max_age, struct ptlrpc_request_set *rqset)
2943 struct ptlrpc_request *req;
2944 struct osc_async_args *aa;
2945 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2948 /* We could possibly pass max_age in the request (as an absolute
2949 * timestamp or a "seconds.usec ago") so the target can avoid doing
2950 * extra calls into the filesystem if that isn't necessary (e.g.
2951 * during mount that would help a bit). Having relative timestamps
2952 * is not so great if request processing is slow, while absolute
2953 * timestamps are not ideal because they need time synchronization. */
2954 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2955 OST_STATFS, 1, NULL, NULL);
2959 ptlrpc_req_set_repsize(req, 2, size);
2960 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2962 req->rq_interpret_reply = osc_statfs_interpret;
2963 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2964 aa = (struct osc_async_args *)&req->rq_async_args;
2967 ptlrpc_set_add_req(rqset, req);
2971 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2974 struct obd_statfs *msfs;
2975 struct ptlrpc_request *req;
2976 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2979 /* We could possibly pass max_age in the request (as an absolute
2980 * timestamp or a "seconds.usec ago") so the target can avoid doing
2981 * extra calls into the filesystem if that isn't necessary (e.g.
2982 * during mount that would help a bit). Having relative timestamps
2983 * is not so great if request processing is slow, while absolute
2984 * timestamps are not ideal because they need time synchronization. */
2985 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2986 OST_STATFS, 1, NULL, NULL);
2990 ptlrpc_req_set_repsize(req, 2, size);
2991 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2993 rc = ptlrpc_queue_wait(req);
2997 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2998 lustre_swab_obd_statfs);
3000 CERROR("Can't unpack obd_statfs\n");
3001 GOTO(out, rc = -EPROTO);
3004 memcpy(osfs, msfs, sizeof(*osfs));
3008 ptlrpc_req_finished(req);
3012 /* Retrieve object striping information.
3014 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3015 * the maximum number of OST indices which will fit in the user buffer.
3016 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3018 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3020 struct lov_user_md lum, *lumk;
3021 int rc = 0, lum_size;
3027 if (copy_from_user(&lum, lump, sizeof(lum)))
3030 if (lum.lmm_magic != LOV_USER_MAGIC)
3033 if (lum.lmm_stripe_count > 0) {
3034 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3035 OBD_ALLOC(lumk, lum_size);
3039 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3041 lum_size = sizeof(lum);
3045 lumk->lmm_object_id = lsm->lsm_object_id;
3046 lumk->lmm_stripe_count = 1;
3048 if (copy_to_user(lump, lumk, lum_size))
3052 OBD_FREE(lumk, lum_size);
3058 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3059 void *karg, void *uarg)
3061 struct obd_device *obd = exp->exp_obd;
3062 struct obd_ioctl_data *data = karg;
3066 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3069 if (!try_module_get(THIS_MODULE)) {
3070 CERROR("Can't get module. Is it alive?");
3075 case OBD_IOC_LOV_GET_CONFIG: {
3077 struct lov_desc *desc;
3078 struct obd_uuid uuid;
3082 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3083 GOTO(out, err = -EINVAL);
3085 data = (struct obd_ioctl_data *)buf;
3087 if (sizeof(*desc) > data->ioc_inllen1) {
3088 obd_ioctl_freedata(buf, len);
3089 GOTO(out, err = -EINVAL);
3092 if (data->ioc_inllen2 < sizeof(uuid)) {
3093 obd_ioctl_freedata(buf, len);
3094 GOTO(out, err = -EINVAL);
3097 desc = (struct lov_desc *)data->ioc_inlbuf1;
3098 desc->ld_tgt_count = 1;
3099 desc->ld_active_tgt_count = 1;
3100 desc->ld_default_stripe_count = 1;
3101 desc->ld_default_stripe_size = 0;
3102 desc->ld_default_stripe_offset = 0;
3103 desc->ld_pattern = 0;
3104 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3106 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3108 err = copy_to_user((void *)uarg, buf, len);
3111 obd_ioctl_freedata(buf, len);
3114 case LL_IOC_LOV_SETSTRIPE:
3115 err = obd_alloc_memmd(exp, karg);
3119 case LL_IOC_LOV_GETSTRIPE:
3120 err = osc_getstripe(karg, uarg);
3122 case OBD_IOC_CLIENT_RECOVER:
3123 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3128 case IOC_OSC_SET_ACTIVE:
3129 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3132 case OBD_IOC_POLL_QUOTACHECK:
3133 err = lquota_poll_check(quota_interface, exp,
3134 (struct if_quotacheck *)karg);
3137 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3138 cmd, cfs_curproc_comm());
3139 GOTO(out, err = -ENOTTY);
3142 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3145 module_put(THIS_MODULE);
3150 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3151 void *key, __u32 *vallen, void *val)
3154 if (!vallen || !val)
3157 if (keylen > strlen("lock_to_stripe") &&
3158 strcmp(key, "lock_to_stripe") == 0) {
3159 __u32 *stripe = val;
3160 *vallen = sizeof(*stripe);
3163 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3164 struct ptlrpc_request *req;
3166 char *bufs[2] = { NULL, key };
3167 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3169 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3170 OST_GET_INFO, 2, size, bufs);
3174 size[REPLY_REC_OFF] = *vallen;
3175 ptlrpc_req_set_repsize(req, 2, size);
3176 rc = ptlrpc_queue_wait(req);
3180 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3181 lustre_swab_ost_last_id);
3182 if (reply == NULL) {
3183 CERROR("Can't unpack OST last ID\n");
3184 GOTO(out, rc = -EPROTO);
3186 *((obd_id *)val) = *reply;
3188 ptlrpc_req_finished(req);
3194 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3197 struct llog_ctxt *ctxt;
3198 struct obd_import *imp = req->rq_import;
3204 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3207 rc = llog_initiator_connect(ctxt);
3209 CERROR("cannot establish connection for "
3210 "ctxt %p: %d\n", ctxt, rc);
3213 imp->imp_server_timeout = 1;
3214 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3215 imp->imp_pingable = 1;
3220 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3221 void *key, obd_count vallen, void *val,
3222 struct ptlrpc_request_set *set)
3224 struct ptlrpc_request *req;
3225 struct obd_device *obd = exp->exp_obd;
3226 struct obd_import *imp = class_exp2cliimp(exp);
3227 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3228 char *bufs[3] = { NULL, key, val };
3231 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3233 if (KEY_IS(KEY_NEXT_ID)) {
3234 if (vallen != sizeof(obd_id))
3236 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3237 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3238 exp->exp_obd->obd_name,
3239 obd->u.cli.cl_oscc.oscc_next_id);
3244 if (KEY_IS("unlinked")) {
3245 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3246 spin_lock(&oscc->oscc_lock);
3247 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3248 spin_unlock(&oscc->oscc_lock);
3252 if (KEY_IS(KEY_INIT_RECOV)) {
3253 if (vallen != sizeof(int))
3255 imp->imp_initial_recov = *(int *)val;
3256 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3257 exp->exp_obd->obd_name,
3258 imp->imp_initial_recov);
3262 if (KEY_IS("checksum")) {
3263 if (vallen != sizeof(int))
3265 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3272 /* We pass all other commands directly to OST. Since nobody calls osc
3273 methods directly and everybody is supposed to go through LOV, we
3274 assume lov checked invalid values for us.
3275 The only recognised values so far are evict_by_nid and mds_conn.
3276 Even if something bad goes through, we'd get a -EINVAL from OST
3279 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3284 if (KEY_IS("mds_conn"))
3285 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3287 ptlrpc_req_set_repsize(req, 1, NULL);
3288 ptlrpc_set_add_req(set, req);
3289 ptlrpc_check_set(set);
3295 static struct llog_operations osc_size_repl_logops = {
3296 lop_cancel: llog_obd_repl_cancel
3299 static struct llog_operations osc_mds_ost_orig_logops;
3300 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3301 int count, struct llog_catid *catid,
3302 struct obd_uuid *uuid)
3307 spin_lock(&obd->obd_dev_lock);
3308 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3309 osc_mds_ost_orig_logops = llog_lvfs_ops;
3310 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3311 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3312 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3313 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3315 spin_unlock(&obd->obd_dev_lock);
3317 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3318 &catid->lci_logid, &osc_mds_ost_orig_logops);
3320 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3324 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3325 &osc_size_repl_logops);
3327 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3330 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3331 obd->obd_name, tgt->obd_name, count, catid, rc);
3332 CERROR("logid "LPX64":0x%x\n",
3333 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3338 static int osc_llog_finish(struct obd_device *obd, int count)
3340 struct llog_ctxt *ctxt;
3341 int rc = 0, rc2 = 0;
3344 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3346 rc = llog_cleanup(ctxt);
3348 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3350 rc2 = llog_cleanup(ctxt);
3357 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3358 struct obd_uuid *cluuid,
3359 struct obd_connect_data *data)
3361 struct client_obd *cli = &obd->u.cli;
3363 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3366 client_obd_list_lock(&cli->cl_loi_list_lock);
3367 data->ocd_grant = cli->cl_avail_grant ?:
3368 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3369 lost_grant = cli->cl_lost_grant;
3370 cli->cl_lost_grant = 0;
3371 client_obd_list_unlock(&cli->cl_loi_list_lock);
3373 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3374 "cl_lost_grant: %ld\n", data->ocd_grant,
3375 cli->cl_avail_grant, lost_grant);
3376 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3377 " ocd_grant: %d\n", data->ocd_connect_flags,
3378 data->ocd_version, data->ocd_grant);
3384 static int osc_disconnect(struct obd_export *exp)
3386 struct obd_device *obd = class_exp2obd(exp);
3387 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3390 if (obd->u.cli.cl_conn_count == 1)
3391 /* flush any remaining cancel messages out to the target */
3392 llog_sync(ctxt, exp);
3394 rc = client_disconnect_export(exp);
3398 static int osc_import_event(struct obd_device *obd,
3399 struct obd_import *imp,
3400 enum obd_import_event event)
3402 struct client_obd *cli;
3406 LASSERT(imp->imp_obd == obd);
3409 case IMP_EVENT_DISCON: {
3410 /* Only do this on the MDS OSC's */
3411 if (imp->imp_server_timeout) {
3412 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3414 spin_lock(&oscc->oscc_lock);
3415 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3416 spin_unlock(&oscc->oscc_lock);
3421 case IMP_EVENT_INACTIVE: {
3422 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3425 case IMP_EVENT_INVALIDATE: {
3426 struct ldlm_namespace *ns = obd->obd_namespace;
3430 client_obd_list_lock(&cli->cl_loi_list_lock);
3431 cli->cl_avail_grant = 0;
3432 cli->cl_lost_grant = 0;
3433 /* all pages go to failing rpcs due to the invalid import */
3434 osc_check_rpcs(cli);
3435 client_obd_list_unlock(&cli->cl_loi_list_lock);
3437 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3441 case IMP_EVENT_ACTIVE: {
3442 /* Only do this on the MDS OSC's */
3443 if (imp->imp_server_timeout) {
3444 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3446 spin_lock(&oscc->oscc_lock);
3447 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3448 spin_unlock(&oscc->oscc_lock);
3450 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3453 case IMP_EVENT_OCD: {
3454 struct obd_connect_data *ocd = &imp->imp_connect_data;
3456 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3457 osc_init_grant(&obd->u.cli, ocd);
3460 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3461 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3463 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3467 CERROR("Unknown import event %d\n", event);
3473 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3479 rc = ptlrpcd_addref();
3483 rc = client_obd_setup(obd, len, buf);
3487 struct lprocfs_static_vars lvars;
3488 struct client_obd *cli = &obd->u.cli;
3490 lprocfs_init_vars(osc, &lvars);
3491 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3492 lproc_osc_attach_seqstat(obd);
3493 ptlrpc_lprocfs_register_obd(obd);
3497 /* We need to allocate a few requests more, because
3498 brw_interpret_oap tries to create new requests before freeing
3499 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3500 reserved, but I afraid that might be too much wasted RAM
3501 in fact, so 2 is just my guess and still should work. */
3502 cli->cl_import->imp_rq_pool =
3503 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3505 ptlrpc_add_rqs_to_pool);
3511 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3517 case OBD_CLEANUP_EARLY: {
3518 struct obd_import *imp;
3519 imp = obd->u.cli.cl_import;
3520 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3521 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3522 ptlrpc_deactivate_import(imp);
3525 case OBD_CLEANUP_EXPORTS: {
3526 /* If we set up but never connected, the
3527 client import will not have been cleaned. */
3528 if (obd->u.cli.cl_import) {
3529 struct obd_import *imp;
3530 imp = obd->u.cli.cl_import;
3531 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3533 ptlrpc_invalidate_import(imp);
3534 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3535 class_destroy_import(imp);
3536 obd->u.cli.cl_import = NULL;
3540 case OBD_CLEANUP_SELF_EXP:
3541 rc = obd_llog_finish(obd, 0);
3543 CERROR("failed to cleanup llogging subsystems\n");
3545 case OBD_CLEANUP_OBD:
3551 int osc_cleanup(struct obd_device *obd)
3553 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3557 ptlrpc_lprocfs_unregister_obd(obd);
3558 lprocfs_obd_cleanup(obd);
3560 spin_lock(&oscc->oscc_lock);
3561 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3562 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3563 spin_unlock(&oscc->oscc_lock);
3565 /* free memory of osc quota cache */
3566 lquota_cleanup(quota_interface, obd);
3568 rc = client_obd_cleanup(obd);
3574 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3576 struct lustre_cfg *lcfg = buf;
3577 struct lprocfs_static_vars lvars;
3580 lprocfs_init_vars(osc, &lvars);
3582 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3586 struct obd_ops osc_obd_ops = {
3587 .o_owner = THIS_MODULE,
3588 .o_setup = osc_setup,
3589 .o_precleanup = osc_precleanup,
3590 .o_cleanup = osc_cleanup,
3591 .o_add_conn = client_import_add_conn,
3592 .o_del_conn = client_import_del_conn,
3593 .o_connect = client_connect_import,
3594 .o_reconnect = osc_reconnect,
3595 .o_disconnect = osc_disconnect,
3596 .o_statfs = osc_statfs,
3597 .o_statfs_async = osc_statfs_async,
3598 .o_packmd = osc_packmd,
3599 .o_unpackmd = osc_unpackmd,
3600 .o_create = osc_create,
3601 .o_destroy = osc_destroy,
3602 .o_getattr = osc_getattr,
3603 .o_getattr_async = osc_getattr_async,
3604 .o_setattr = osc_setattr,
3605 .o_setattr_async = osc_setattr_async,
3607 .o_brw_async = osc_brw_async,
3608 .o_prep_async_page = osc_prep_async_page,
3609 .o_queue_async_io = osc_queue_async_io,
3610 .o_set_async_flags = osc_set_async_flags,
3611 .o_queue_group_io = osc_queue_group_io,
3612 .o_trigger_group_io = osc_trigger_group_io,
3613 .o_teardown_async_page = osc_teardown_async_page,
3614 .o_punch = osc_punch,
3616 .o_enqueue = osc_enqueue,
3617 .o_match = osc_match,
3618 .o_change_cbdata = osc_change_cbdata,
3619 .o_cancel = osc_cancel,
3620 .o_cancel_unused = osc_cancel_unused,
3621 .o_join_lru = osc_join_lru,
3622 .o_iocontrol = osc_iocontrol,
3623 .o_get_info = osc_get_info,
3624 .o_set_info_async = osc_set_info_async,
3625 .o_import_event = osc_import_event,
3626 .o_llog_init = osc_llog_init,
3627 .o_llog_finish = osc_llog_finish,
3628 .o_process_config = osc_process_config,
3631 int __init osc_init(void)
3633 struct lprocfs_static_vars lvars;
3637 lprocfs_init_vars(osc, &lvars);
3639 request_module("lquota");
3640 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3641 lquota_init(quota_interface);
3642 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3644 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3647 if (quota_interface)
3648 PORTAL_SYMBOL_PUT(osc_quota_interface);
3656 static void /*__exit*/ osc_exit(void)
3658 lquota_exit(quota_interface);
3659 if (quota_interface)
3660 PORTAL_SYMBOL_PUT(osc_quota_interface);
3662 class_unregister_type(LUSTRE_OSC_NAME);
3665 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3666 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3667 MODULE_LICENSE("GPL");
3669 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);