1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102 struct lov_mds_md *lmm, int lmm_bytes)
108 if (lmm_bytes < sizeof (*lmm)) {
109 CERROR("lov_mds_md too small: %d, need %d\n",
110 lmm_bytes, (int)sizeof(*lmm));
113 /* XXX LOV_MAGIC etc check? */
115 if (lmm->lmm_object_id == 0) {
116 CERROR("lov_mds_md: zero lmm_object_id\n");
121 lsm_size = lov_stripe_md_size(1);
125 if (*lsmp != NULL && lmm == NULL) {
126 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
127 OBD_FREE(*lsmp, lsm_size);
133 OBD_ALLOC(*lsmp, lsm_size);
136 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137 if ((*lsmp)->lsm_oinfo[0] == NULL) {
138 OBD_FREE(*lsmp, lsm_size);
141 loi_init((*lsmp)->lsm_oinfo[0]);
145 /* XXX zero *lsmp? */
146 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
147 LASSERT((*lsmp)->lsm_object_id);
150 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155 static int osc_getattr_interpret(struct ptlrpc_request *req,
156 struct osc_async_args *aa, int rc)
158 struct ost_body *body;
164 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
165 lustre_swab_ost_body);
167 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
168 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
170 /* This should really be sent by the OST */
171 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
172 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
174 CERROR("can't unpack ost_body\n");
176 aa->aa_oi->oi_oa->o_valid = 0;
179 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
183 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
184 struct ptlrpc_request_set *set)
186 struct ptlrpc_request *req;
187 struct ost_body *body;
188 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
189 struct osc_async_args *aa;
192 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
193 OST_GETATTR, 2, size,NULL);
197 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
198 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
200 ptlrpc_req_set_repsize(req, 2, size);
201 req->rq_interpret_reply = osc_getattr_interpret;
203 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
204 aa = (struct osc_async_args *)&req->rq_async_args;
207 ptlrpc_set_add_req(set, req);
211 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
213 struct ptlrpc_request *req;
214 struct ost_body *body;
215 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
218 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
219 OST_GETATTR, 2, size, NULL);
223 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
224 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
226 ptlrpc_req_set_repsize(req, 2, size);
228 rc = ptlrpc_queue_wait(req);
230 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
234 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
235 lustre_swab_ost_body);
237 CERROR ("can't unpack ost_body\n");
238 GOTO (out, rc = -EPROTO);
241 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
242 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
244 /* This should really be sent by the OST */
245 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
246 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
250 ptlrpc_req_finished(req);
254 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
255 struct obd_trans_info *oti)
257 struct ptlrpc_request *req;
258 struct ost_body *body;
259 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
262 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
263 OST_SETATTR, 2, size, NULL);
267 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
268 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
270 ptlrpc_req_set_repsize(req, 2, size);
272 rc = ptlrpc_queue_wait(req);
276 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
277 lustre_swab_ost_body);
279 GOTO(out, rc = -EPROTO);
281 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
285 ptlrpc_req_finished(req);
289 static int osc_setattr_interpret(struct ptlrpc_request *req,
290 struct osc_async_args *aa, int rc)
292 struct ost_body *body;
298 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
299 lustre_swab_ost_body);
301 CERROR("can't unpack ost_body\n");
302 GOTO(out, rc = -EPROTO);
305 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
307 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
311 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
312 struct obd_trans_info *oti,
313 struct ptlrpc_request_set *rqset)
315 struct ptlrpc_request *req;
316 struct ost_body *body;
317 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
318 struct osc_async_args *aa;
321 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
322 OST_SETATTR, 2, size, NULL);
326 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
328 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
330 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
331 sizeof(*oti->oti_logcookies));
334 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
335 ptlrpc_req_set_repsize(req, 2, size);
336 /* do mds to ost setattr asynchronouly */
338 /* Do not wait for response. */
339 ptlrpcd_add_req(req);
341 req->rq_interpret_reply = osc_setattr_interpret;
343 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
344 aa = (struct osc_async_args *)&req->rq_async_args;
347 ptlrpc_set_add_req(rqset, req);
353 int osc_real_create(struct obd_export *exp, struct obdo *oa,
354 struct lov_stripe_md **ea, struct obd_trans_info *oti)
356 struct ptlrpc_request *req;
357 struct ost_body *body;
358 struct lov_stripe_md *lsm;
359 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
367 rc = obd_alloc_memmd(exp, &lsm);
372 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
373 OST_CREATE, 2, size, NULL);
375 GOTO(out, rc = -ENOMEM);
377 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
378 memcpy(&body->oa, oa, sizeof(body->oa));
380 ptlrpc_req_set_repsize(req, 2, size);
381 if (oa->o_valid & OBD_MD_FLINLINE) {
382 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
383 oa->o_flags == OBD_FL_DELORPHAN);
385 "delorphan from OST integration");
386 /* Don't resend the delorphan req */
387 req->rq_no_resend = req->rq_no_delay = 1;
390 rc = ptlrpc_queue_wait(req);
394 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
395 lustre_swab_ost_body);
397 CERROR ("can't unpack ost_body\n");
398 GOTO (out_req, rc = -EPROTO);
401 memcpy(oa, &body->oa, sizeof(*oa));
403 /* This should really be sent by the OST */
404 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
405 oa->o_valid |= OBD_MD_FLBLKSZ;
407 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
408 * have valid lsm_oinfo data structs, so don't go touching that.
409 * This needs to be fixed in a big way.
411 lsm->lsm_object_id = oa->o_id;
415 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
417 if (oa->o_valid & OBD_MD_FLCOOKIE) {
418 if (!oti->oti_logcookies)
419 oti_alloc_cookies(oti, 1);
420 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
421 sizeof(oti->oti_onecookie));
425 CDEBUG(D_HA, "transno: "LPD64"\n",
426 lustre_msg_get_transno(req->rq_repmsg));
429 ptlrpc_req_finished(req);
432 obd_free_memmd(exp, &lsm);
436 static int osc_punch_interpret(struct ptlrpc_request *req,
437 struct osc_async_args *aa, int rc)
439 struct ost_body *body;
445 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
446 lustre_swab_ost_body);
448 CERROR ("can't unpack ost_body\n");
449 GOTO(out, rc = -EPROTO);
452 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
454 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
458 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
459 struct obd_trans_info *oti,
460 struct ptlrpc_request_set *rqset)
462 struct ptlrpc_request *req;
463 struct osc_async_args *aa;
464 struct ost_body *body;
465 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
473 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
474 OST_PUNCH, 2, size, NULL);
478 /* FIXME bug 249. Also see bug 7198 */
479 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
480 OBD_CONNECT_REQPORTAL)
481 req->rq_request_portal = OST_IO_PORTAL;
483 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
484 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
486 /* overload the size and blocks fields in the oa with start/end */
487 body->oa.o_size = oinfo->oi_policy.l_extent.start;
488 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
489 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
491 ptlrpc_req_set_repsize(req, 2, size);
493 req->rq_interpret_reply = osc_punch_interpret;
494 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
495 aa = (struct osc_async_args *)&req->rq_async_args;
497 ptlrpc_set_add_req(rqset, req);
502 static int osc_sync(struct obd_export *exp, struct obdo *oa,
503 struct lov_stripe_md *md, obd_size start, obd_size end)
505 struct ptlrpc_request *req;
506 struct ost_body *body;
507 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
515 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
516 OST_SYNC, 2, size, NULL);
520 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
521 memcpy(&body->oa, oa, sizeof(*oa));
523 /* overload the size and blocks fields in the oa with start/end */
524 body->oa.o_size = start;
525 body->oa.o_blocks = end;
526 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
528 ptlrpc_req_set_repsize(req, 2, size);
530 rc = ptlrpc_queue_wait(req);
534 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
535 lustre_swab_ost_body);
537 CERROR ("can't unpack ost_body\n");
538 GOTO (out, rc = -EPROTO);
541 memcpy(oa, &body->oa, sizeof(*oa));
545 ptlrpc_req_finished(req);
549 /* Destroy requests can be async always on the client, and we don't even really
550 * care about the return code since the client cannot do anything at all about
552 * When the MDS is unlinking a filename, it saves the file objects into a
553 * recovery llog, and these object records are cancelled when the OST reports
554 * they were destroyed and sync'd to disk (i.e. transaction committed).
555 * If the client dies, or the OST is down when the object should be destroyed,
556 * the records are not cancelled, and when the OST reconnects to the MDS next,
557 * it will retrieve the llog unlink logs and then sends the log cancellation
558 * cookies to the MDS after committing destroy transactions. */
559 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
560 struct lov_stripe_md *ea, struct obd_trans_info *oti,
561 struct obd_export *md_export)
563 struct ptlrpc_request *req;
564 struct ost_body *body;
565 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
573 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
574 OST_DESTROY, 2, size, NULL);
578 /* FIXME bug 249. Also see bug 7198 */
579 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
580 OBD_CONNECT_REQPORTAL)
581 req->rq_request_portal = OST_IO_PORTAL;
583 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
585 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
586 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
587 sizeof(*oti->oti_logcookies));
590 memcpy(&body->oa, oa, sizeof(*oa));
591 ptlrpc_req_set_repsize(req, 2, size);
593 ptlrpcd_add_req(req);
597 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
600 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
602 LASSERT(!(oa->o_valid & bits));
605 client_obd_list_lock(&cli->cl_loi_list_lock);
606 oa->o_dirty = cli->cl_dirty;
607 if (cli->cl_dirty > cli->cl_dirty_max) {
608 CERROR("dirty %lu > dirty_max %lu\n",
609 cli->cl_dirty, cli->cl_dirty_max);
611 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
612 CERROR("dirty %d > system dirty_max %d\n",
613 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
615 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
616 CERROR("dirty %lu - dirty_max %lu too big???\n",
617 cli->cl_dirty, cli->cl_dirty_max);
620 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
621 (cli->cl_max_rpcs_in_flight + 1);
622 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
624 oa->o_grant = cli->cl_avail_grant;
625 oa->o_dropped = cli->cl_lost_grant;
626 cli->cl_lost_grant = 0;
627 client_obd_list_unlock(&cli->cl_loi_list_lock);
628 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
629 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
632 /* caller must hold loi_list_lock */
633 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
635 atomic_inc(&obd_dirty_pages);
636 cli->cl_dirty += CFS_PAGE_SIZE;
637 cli->cl_avail_grant -= CFS_PAGE_SIZE;
638 pga->flag |= OBD_BRW_FROM_GRANT;
639 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
640 CFS_PAGE_SIZE, pga, pga->pg);
641 LASSERT(cli->cl_avail_grant >= 0);
644 /* the companion to osc_consume_write_grant, called when a brw has completed.
645 * must be called with the loi lock held. */
646 static void osc_release_write_grant(struct client_obd *cli,
647 struct brw_page *pga, int sent)
649 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
652 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
657 pga->flag &= ~OBD_BRW_FROM_GRANT;
658 atomic_dec(&obd_dirty_pages);
659 cli->cl_dirty -= CFS_PAGE_SIZE;
661 cli->cl_lost_grant += CFS_PAGE_SIZE;
662 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
663 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
664 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
665 /* For short writes we shouldn't count parts of pages that
666 * span a whole block on the OST side, or our accounting goes
667 * wrong. Should match the code in filter_grant_check. */
668 int offset = pga->off & ~CFS_PAGE_MASK;
669 int count = pga->count + (offset & (blocksize - 1));
670 int end = (offset + pga->count) & (blocksize - 1);
672 count += blocksize - end;
674 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
675 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
676 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
677 cli->cl_avail_grant, cli->cl_dirty);
683 static unsigned long rpcs_in_flight(struct client_obd *cli)
685 return cli->cl_r_in_flight + cli->cl_w_in_flight;
688 /* caller must hold loi_list_lock */
689 void osc_wake_cache_waiters(struct client_obd *cli)
691 struct list_head *l, *tmp;
692 struct osc_cache_waiter *ocw;
695 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
696 /* if we can't dirty more, we must wait until some is written */
697 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
698 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
699 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
700 "osc max %ld, sys max %d\n", cli->cl_dirty,
701 cli->cl_dirty_max, obd_max_dirty_pages);
705 /* if still dirty cache but no grant wait for pending RPCs that
706 * may yet return us some grant before doing sync writes */
707 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
708 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
709 cli->cl_w_in_flight);
713 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
714 list_del_init(&ocw->ocw_entry);
715 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
716 /* no more RPCs in flight to return grant, do sync IO */
717 ocw->ocw_rc = -EDQUOT;
718 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
720 osc_consume_write_grant(cli,
721 &ocw->ocw_oap->oap_brw_page);
724 cfs_waitq_signal(&ocw->ocw_waitq);
730 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
732 client_obd_list_lock(&cli->cl_loi_list_lock);
733 cli->cl_avail_grant = ocd->ocd_grant;
734 client_obd_list_unlock(&cli->cl_loi_list_lock);
736 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
737 cli->cl_avail_grant, cli->cl_lost_grant);
738 LASSERT(cli->cl_avail_grant >= 0);
741 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
743 client_obd_list_lock(&cli->cl_loi_list_lock);
744 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
745 cli->cl_avail_grant += body->oa.o_grant;
746 /* waiters are woken in brw_interpret_oap */
747 client_obd_list_unlock(&cli->cl_loi_list_lock);
750 /* We assume that the reason this OSC got a short read is because it read
751 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
752 * via the LOV, and it _knows_ it's reading inside the file, it's just that
753 * this stripe never got written at or beyond this stripe offset yet. */
754 static void handle_short_read(int nob_read, obd_count page_count,
755 struct brw_page **pga)
760 /* skip bytes read OK */
761 while (nob_read > 0) {
762 LASSERT (page_count > 0);
764 if (pga[i]->count > nob_read) {
765 /* EOF inside this page */
766 ptr = cfs_kmap(pga[i]->pg) +
767 (pga[i]->off & ~CFS_PAGE_MASK);
768 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
769 cfs_kunmap(pga[i]->pg);
775 nob_read -= pga[i]->count;
780 /* zero remaining pages */
781 while (page_count-- > 0) {
782 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
783 memset(ptr, 0, pga[i]->count);
784 cfs_kunmap(pga[i]->pg);
789 static int check_write_rcs(struct ptlrpc_request *req,
790 int requested_nob, int niocount,
791 obd_count page_count, struct brw_page **pga)
795 /* return error if any niobuf was in error */
796 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
797 sizeof(*remote_rcs) * niocount, NULL);
798 if (remote_rcs == NULL) {
799 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
802 if (lustre_msg_swabbed(req->rq_repmsg))
803 for (i = 0; i < niocount; i++)
804 __swab32s(&remote_rcs[i]);
806 for (i = 0; i < niocount; i++) {
807 if (remote_rcs[i] < 0)
808 return(remote_rcs[i]);
810 if (remote_rcs[i] != 0) {
811 CERROR("rc[%d] invalid (%d) req %p\n",
812 i, remote_rcs[i], req);
817 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
818 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
819 requested_nob, req->rq_bulk->bd_nob_transferred);
826 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
828 if (p1->flag != p2->flag) {
829 unsigned mask = ~OBD_BRW_FROM_GRANT;
831 /* warn if we try to combine flags that we don't know to be
833 if ((p1->flag & mask) != (p2->flag & mask))
834 CERROR("is it ok to have flags 0x%x and 0x%x in the "
835 "same brw?\n", p1->flag, p2->flag);
839 return (p1->off + p1->count == p2->off);
842 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
843 struct brw_page **pga)
848 LASSERT (pg_count > 0);
849 while (nob > 0 && pg_count > 0) {
850 char *ptr = cfs_kmap(pga[i]->pg);
851 int off = pga[i]->off & ~CFS_PAGE_MASK;
852 int count = pga[i]->count > nob ? nob : pga[i]->count;
854 /* corrupt the data before we compute the checksum, to
855 * simulate an OST->client data error */
856 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
857 memcpy(ptr + off, "bad1", min(4, nob));
858 cksum = crc32_le(cksum, ptr + off, count);
859 cfs_kunmap(pga[i]->pg);
860 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
863 nob -= pga[i]->count;
867 /* For sending we only compute the wrong checksum instead
868 * of corrupting the data so it is still correct on a redo */
869 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
875 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
876 struct lov_stripe_md *lsm, obd_count page_count,
877 struct brw_page **pga,
878 struct ptlrpc_request **reqp)
880 struct ptlrpc_request *req;
881 struct ptlrpc_bulk_desc *desc;
882 struct ost_body *body;
883 struct obd_ioobj *ioobj;
884 struct niobuf_remote *niobuf;
885 int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
886 int niocount, i, requested_nob, opc, rc;
887 struct ptlrpc_request_pool *pool;
888 struct osc_brw_async_args *aa;
891 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
892 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
894 for (niocount = i = 1; i < page_count; i++) {
895 if (!can_merge_pages(pga[i - 1], pga[i]))
899 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
900 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
902 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
903 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
908 /* FIXME bug 249. Also see bug 7198 */
909 if (cli->cl_import->imp_connect_data.ocd_connect_flags &
910 OBD_CONNECT_REQPORTAL)
911 req->rq_request_portal = OST_IO_PORTAL;
913 if (opc == OST_WRITE)
914 desc = ptlrpc_prep_bulk_imp (req, page_count,
915 BULK_GET_SOURCE, OST_BULK_PORTAL);
917 desc = ptlrpc_prep_bulk_imp (req, page_count,
918 BULK_PUT_SINK, OST_BULK_PORTAL);
920 GOTO(out, rc = -ENOMEM);
921 /* NB request now owns desc and will free it when it gets freed */
923 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
924 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
925 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
926 niocount * sizeof(*niobuf));
928 memcpy(&body->oa, oa, sizeof(*oa));
930 obdo_to_ioobj(oa, ioobj);
931 ioobj->ioo_bufcnt = niocount;
933 LASSERT (page_count > 0);
934 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
935 struct brw_page *pg = pga[i];
936 struct brw_page *pg_prev = pga[i - 1];
938 LASSERT(pg->count > 0);
939 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
940 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
943 LASSERTF(i == 0 || pg->off > pg_prev->off,
944 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
945 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
947 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
948 pg_prev->pg, page_private(pg_prev->pg),
949 pg_prev->pg->index, pg_prev->off);
951 LASSERTF(i == 0 || pg->off > pg_prev->off,
952 "i %d p_c %u\n", i, page_count);
954 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
955 (pg->flag & OBD_BRW_SRVLOCK));
957 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
959 requested_nob += pg->count;
961 if (i > 0 && can_merge_pages(pg_prev, pg)) {
963 niobuf->len += pg->count;
965 niobuf->offset = pg->off;
966 niobuf->len = pg->count;
967 niobuf->flags = pg->flag;
971 LASSERT((void *)(niobuf - niocount) ==
972 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
973 niocount * sizeof(*niobuf)));
974 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
976 /* size[REQ_REC_OFF] still sizeof (*body) */
977 if (opc == OST_WRITE) {
978 if (unlikely(cli->cl_checksum)) {
979 body->oa.o_valid |= OBD_MD_FLCKSUM;
980 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
982 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
984 /* save this in 'oa', too, for later checking */
985 oa->o_valid |= OBD_MD_FLCKSUM;
987 /* clear out the checksum flag, in case this is a
988 * resend but cl_checksum is no longer set. b=11238 */
989 oa->o_valid &= ~OBD_MD_FLCKSUM;
991 oa->o_cksum = body->oa.o_cksum;
992 /* 1 RC per niobuf */
993 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
994 ptlrpc_req_set_repsize(req, 3, size);
996 if (unlikely(cli->cl_checksum))
997 body->oa.o_valid |= OBD_MD_FLCKSUM;
998 /* 1 RC for the whole I/O */
999 ptlrpc_req_set_repsize(req, 2, size);
1002 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1003 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1005 aa->aa_requested_nob = requested_nob;
1006 aa->aa_nio_count = niocount;
1007 aa->aa_page_count = page_count;
1008 aa->aa_retries = 5; /*retry for checksum errors; lprocfs? */
1011 INIT_LIST_HEAD(&aa->aa_oaps);
1017 ptlrpc_req_finished (req);
1021 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1022 __u32 client_cksum, __u32 server_cksum, int nob,
1023 obd_count page_count, struct brw_page **pga)
1028 if (server_cksum == client_cksum) {
1029 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1033 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1035 if (new_cksum == server_cksum)
1036 msg = "changed on the client after we checksummed it";
1037 else if (new_cksum == client_cksum)
1038 msg = "changed in transit before arrival at OST";
1040 msg = "changed in transit AND doesn't match the original";
1042 LCONSOLE_ERROR("BAD WRITE CHECKSUM: %s: from %s inum "LPU64"/"LPU64
1043 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1044 msg, libcfs_nid2str(peer->nid),
1045 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1046 oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0,
1048 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1050 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1051 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1052 client_cksum, server_cksum, new_cksum);
1057 /* Note rc enters this function as number of bytes transferred */
1058 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1060 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1061 const lnet_process_id_t *peer =
1062 &req->rq_import->imp_connection->c_peer;
1063 struct client_obd *cli = aa->aa_cli;
1064 struct ost_body *body;
1065 __u32 client_cksum = 0;
1068 if (rc < 0 && rc != -EDQUOT)
1071 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1072 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1073 lustre_swab_ost_body);
1075 CERROR ("Can't unpack body\n");
1079 /* set/clear over quota flag for a uid/gid */
1080 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1081 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1082 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1083 body->oa.o_gid, body->oa.o_valid,
1089 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1090 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1092 osc_update_grant(cli, body);
1094 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1096 CERROR ("Unexpected +ve rc %d\n", rc);
1099 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1101 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1103 check_write_checksum(&body->oa, peer, client_cksum,
1105 aa->aa_requested_nob,
1110 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1111 aa->aa_page_count, aa->aa_ppga);
1115 /* The rest of this function executes only for OST_READs */
1116 if (rc > aa->aa_requested_nob) {
1117 CERROR("Unexpected rc %d (%d requested)\n", rc,
1118 aa->aa_requested_nob);
1122 if (rc != req->rq_bulk->bd_nob_transferred) {
1123 CERROR ("Unexpected rc %d (%d transferred)\n",
1124 rc, req->rq_bulk->bd_nob_transferred);
1128 if (rc < aa->aa_requested_nob)
1129 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1131 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1132 static int cksum_counter;
1133 __u32 server_cksum = body->oa.o_cksum;
1134 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1137 if (server_cksum == ~0 && rc > 0) {
1138 CERROR("Protocol error: server %s set the 'checksum' "
1139 "bit, but didn't send a checksum. Not fatal, "
1140 "but please tell CFS.\n",
1141 libcfs_nid2str(peer->nid));
1142 } else if (server_cksum != client_cksum) {
1143 LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s inum "
1144 LPU64"/"LPU64" object "LPU64"/"LPU64
1145 " extent ["LPU64"-"LPU64"]\n",
1146 req->rq_import->imp_obd->obd_name,
1147 libcfs_nid2str(peer->nid),
1148 body->oa.o_valid & OBD_MD_FLFID ?
1149 body->oa.o_fid : (__u64)0,
1150 body->oa.o_valid & OBD_MD_FLFID ?
1151 body->oa.o_generation :(__u64)0,
1153 body->oa.o_valid & OBD_MD_FLGROUP ?
1154 body->oa.o_gr : (__u64)0,
1155 aa->aa_ppga[0]->off,
1156 aa->aa_ppga[aa->aa_page_count-1]->off +
1157 aa->aa_ppga[aa->aa_page_count-1]->count -
1159 CERROR("client %x, server %x\n",
1160 client_cksum, server_cksum);
1162 aa->aa_oa->o_cksum = client_cksum;
1166 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1169 } else if (unlikely(client_cksum)) {
1170 static int cksum_missed;
1173 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1174 CERROR("Checksum %u requested from %s but not sent\n",
1175 cksum_missed, libcfs_nid2str(peer->nid));
1181 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1186 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1187 struct lov_stripe_md *lsm,
1188 obd_count page_count, struct brw_page **pga)
1190 struct ptlrpc_request *request;
1191 int rc, retries = 5; /* lprocfs? */
1195 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1196 page_count, pga, &request);
1200 rc = ptlrpc_queue_wait(request);
1202 if (rc == -ETIMEDOUT && request->rq_resend) {
1203 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1204 ptlrpc_req_finished(request);
1208 rc = osc_brw_fini_request(request, rc);
1210 ptlrpc_req_finished(request);
1211 if (rc == -EAGAIN) {
1219 int osc_brw_redo_request(struct ptlrpc_request *request,
1220 struct osc_brw_async_args *aa)
1222 struct ptlrpc_request *new_req;
1223 struct ptlrpc_request_set *set = request->rq_set;
1224 struct osc_brw_async_args *new_aa;
1225 struct osc_async_page *oap;
1229 if (aa->aa_retries-- <= 0) {
1230 CERROR("too many checksum retries, returning error\n");
1234 DEBUG_REQ(D_ERROR, request, "redo for checksum error");
1235 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1236 if (oap->oap_request != NULL) {
1237 LASSERTF(request == oap->oap_request,
1238 "request %p != oap_request %p\n",
1239 request, oap->oap_request);
1240 if (oap->oap_interrupted) {
1241 ptlrpc_mark_interrupted(oap->oap_request);
1250 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1251 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1252 aa->aa_cli, aa->aa_oa,
1253 NULL /* lsm unused by osc currently */,
1254 aa->aa_page_count, aa->aa_ppga, &new_req);
1258 /* New request takes over pga and oaps from old request.
1259 * Note that copying a list_head doesn't work, need to move it... */
1260 new_req->rq_interpret_reply = request->rq_interpret_reply;
1261 new_req->rq_async_args = request->rq_async_args;
1262 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1263 INIT_LIST_HEAD(&new_aa->aa_oaps);
1264 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1265 INIT_LIST_HEAD(&aa->aa_oaps);
1267 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1268 if (oap->oap_request) {
1269 ptlrpc_req_finished(oap->oap_request);
1270 oap->oap_request = ptlrpc_request_addref(new_req);
1274 ptlrpc_set_add_req(set, new_req);
1279 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1281 struct osc_brw_async_args *aa = data;
1285 rc = osc_brw_fini_request(request, rc);
1286 if (rc == -EAGAIN) {
1287 rc = osc_brw_redo_request(request, aa);
1292 spin_lock(&aa->aa_cli->cl_loi_list_lock);
1293 for (i = 0; i < aa->aa_page_count; i++)
1294 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1295 spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1297 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1302 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1303 struct lov_stripe_md *lsm, obd_count page_count,
1304 struct brw_page **pga, struct ptlrpc_request_set *set)
1306 struct ptlrpc_request *request;
1307 struct client_obd *cli = &exp->exp_obd->u.cli;
1311 /* Consume write credits even if doing a sync write -
1312 * otherwise we may run out of space on OST due to grant. */
1313 if (cmd == OBD_BRW_WRITE) {
1314 spin_lock(&cli->cl_loi_list_lock);
1315 for (i = 0; i < page_count; i++) {
1316 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1317 osc_consume_write_grant(cli, pga[i]);
1319 spin_unlock(&cli->cl_loi_list_lock);
1322 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1323 page_count, pga, &request);
1326 request->rq_interpret_reply = brw_interpret;
1327 ptlrpc_set_add_req(set, request);
1328 } else if (cmd == OBD_BRW_WRITE) {
1329 spin_lock(&cli->cl_loi_list_lock);
1330 for (i = 0; i < page_count; i++)
1331 osc_release_write_grant(cli, pga[i], 0);
1332 spin_unlock(&cli->cl_loi_list_lock);
1339 * ugh, we want disk allocation on the target to happen in offset order. we'll
1340 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1341 * fine for our small page arrays and doesn't require allocation. its an
1342 * insertion sort that swaps elements that are strides apart, shrinking the
1343 * stride down until its '1' and the array is sorted.
1345 static void sort_brw_pages(struct brw_page **array, int num)
1348 struct brw_page *tmp;
1352 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1357 for (i = stride ; i < num ; i++) {
1360 while (j >= stride && array[j-stride]->off > tmp->off) {
1361 array[j] = array[j - stride];
1366 } while (stride > 1);
1369 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1375 LASSERT (pages > 0);
1376 offset = pg[i]->off & (~CFS_PAGE_MASK);
1380 if (pages == 0) /* that's all */
1383 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1384 return count; /* doesn't end on page boundary */
1387 offset = pg[i]->off & (~CFS_PAGE_MASK);
1388 if (offset != 0) /* doesn't start on page boundary */
1395 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1397 struct brw_page **ppga;
1400 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1404 for (i = 0; i < count; i++)
1409 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1411 LASSERT(ppga != NULL);
1412 OBD_FREE(ppga, sizeof(*ppga) * count);
1415 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1416 obd_count page_count, struct brw_page *pga,
1417 struct obd_trans_info *oti)
1419 struct obdo *saved_oa = NULL;
1420 struct brw_page **ppga, **orig;
1421 struct obd_import *imp = class_exp2cliimp(exp);
1422 struct client_obd *cli = &imp->imp_obd->u.cli;
1423 int rc, page_count_orig;
1426 if (cmd & OBD_BRW_CHECK) {
1427 /* The caller just wants to know if there's a chance that this
1428 * I/O can succeed */
1430 if (imp == NULL || imp->imp_invalid)
1435 /* test_brw with a failed create can trip this, maybe others. */
1436 LASSERT(cli->cl_max_pages_per_rpc);
1440 orig = ppga = osc_build_ppga(pga, page_count);
1443 page_count_orig = page_count;
1445 sort_brw_pages(ppga, page_count);
1446 while (page_count) {
1447 obd_count pages_per_brw;
1449 if (page_count > cli->cl_max_pages_per_rpc)
1450 pages_per_brw = cli->cl_max_pages_per_rpc;
1452 pages_per_brw = page_count;
1454 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1456 if (saved_oa != NULL) {
1457 /* restore previously saved oa */
1458 *oinfo->oi_oa = *saved_oa;
1459 } else if (page_count > pages_per_brw) {
1460 /* save a copy of oa (brw will clobber it) */
1461 saved_oa = obdo_alloc();
1462 if (saved_oa == NULL)
1463 GOTO(out, rc = -ENOMEM);
1464 *saved_oa = *oinfo->oi_oa;
1467 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1468 pages_per_brw, ppga);
1473 page_count -= pages_per_brw;
1474 ppga += pages_per_brw;
1478 osc_release_ppga(orig, page_count_orig);
1480 if (saved_oa != NULL)
1481 obdo_free(saved_oa);
1486 static int osc_brw_async(int cmd, struct obd_export *exp,
1487 struct obd_info *oinfo, obd_count page_count,
1488 struct brw_page *pga, struct obd_trans_info *oti,
1489 struct ptlrpc_request_set *set)
1491 struct brw_page **ppga, **orig;
1492 int page_count_orig;
1496 if (cmd & OBD_BRW_CHECK) {
1497 /* The caller just wants to know if there's a chance that this
1498 * I/O can succeed */
1499 struct obd_import *imp = class_exp2cliimp(exp);
1501 if (imp == NULL || imp->imp_invalid)
1506 orig = ppga = osc_build_ppga(pga, page_count);
1509 page_count_orig = page_count;
1511 sort_brw_pages(ppga, page_count);
1512 while (page_count) {
1513 struct brw_page **copy;
1514 obd_count pages_per_brw;
1516 pages_per_brw = min_t(obd_count, page_count,
1517 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1519 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1521 /* use ppga only if single RPC is going to fly */
1522 if (pages_per_brw != page_count_orig || ppga != orig) {
1523 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1525 GOTO(out, rc = -ENOMEM);
1526 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1530 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1531 pages_per_brw, copy, set);
1535 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1540 /* we passed it to async_internal() which is
1541 * now responsible for releasing memory */
1545 page_count -= pages_per_brw;
1546 ppga += pages_per_brw;
1550 osc_release_ppga(orig, page_count_orig);
1554 static void osc_check_rpcs(struct client_obd *cli);
1556 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1557 * the dirty accounting. Writeback completes or truncate happens before
1558 * writing starts. Must be called with the loi lock held. */
1559 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1562 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1565 /* This maintains the lists of pending pages to read/write for a given object
1566 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1567 * to quickly find objects that are ready to send an RPC. */
1568 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1574 if (lop->lop_num_pending == 0)
1577 /* if we have an invalid import we want to drain the queued pages
1578 * by forcing them through rpcs that immediately fail and complete
1579 * the pages. recovery relies on this to empty the queued pages
1580 * before canceling the locks and evicting down the llite pages */
1581 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1584 /* stream rpcs in queue order as long as as there is an urgent page
1585 * queued. this is our cheap solution for good batching in the case
1586 * where writepage marks some random page in the middle of the file
1587 * as urgent because of, say, memory pressure */
1588 if (!list_empty(&lop->lop_urgent)) {
1589 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1593 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1594 optimal = cli->cl_max_pages_per_rpc;
1595 if (cmd & OBD_BRW_WRITE) {
1596 /* trigger a write rpc stream as long as there are dirtiers
1597 * waiting for space. as they're waiting, they're not going to
1598 * create more pages to coallesce with what's waiting.. */
1599 if (!list_empty(&cli->cl_cache_waiters)) {
1600 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1604 /* +16 to avoid triggering rpcs that would want to include pages
1605 * that are being queued but which can't be made ready until
1606 * the queuer finishes with the page. this is a wart for
1607 * llite::commit_write() */
1610 if (lop->lop_num_pending >= optimal)
1616 static void on_list(struct list_head *item, struct list_head *list,
1619 if (list_empty(item) && should_be_on)
1620 list_add_tail(item, list);
1621 else if (!list_empty(item) && !should_be_on)
1622 list_del_init(item);
1625 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1626 * can find pages to build into rpcs quickly */
1627 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1629 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1630 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1631 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1633 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1634 loi->loi_write_lop.lop_num_pending);
1636 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1637 loi->loi_read_lop.lop_num_pending);
1640 static void lop_update_pending(struct client_obd *cli,
1641 struct loi_oap_pages *lop, int cmd, int delta)
1643 lop->lop_num_pending += delta;
1644 if (cmd & OBD_BRW_WRITE)
1645 cli->cl_pending_w_pages += delta;
1647 cli->cl_pending_r_pages += delta;
1650 /* this is called when a sync waiter receives an interruption. Its job is to
1651 * get the caller woken as soon as possible. If its page hasn't been put in an
1652 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1653 * desiring interruption which will forcefully complete the rpc once the rpc
1655 static void osc_occ_interrupted(struct oig_callback_context *occ)
1657 struct osc_async_page *oap;
1658 struct loi_oap_pages *lop;
1659 struct lov_oinfo *loi;
1662 /* XXX member_of() */
1663 oap = list_entry(occ, struct osc_async_page, oap_occ);
1665 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1667 oap->oap_interrupted = 1;
1669 /* ok, it's been put in an rpc. only one oap gets a request reference */
1670 if (oap->oap_request != NULL) {
1671 ptlrpc_mark_interrupted(oap->oap_request);
1672 ptlrpcd_wake(oap->oap_request);
1676 /* we don't get interruption callbacks until osc_trigger_group_io()
1677 * has been called and put the sync oaps in the pending/urgent lists.*/
1678 if (!list_empty(&oap->oap_pending_item)) {
1679 list_del_init(&oap->oap_pending_item);
1680 list_del_init(&oap->oap_urgent_item);
1683 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1684 &loi->loi_write_lop : &loi->loi_read_lop;
1685 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1686 loi_list_maint(oap->oap_cli, oap->oap_loi);
1688 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1689 oap->oap_oig = NULL;
1693 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1696 /* this is trying to propogate async writeback errors back up to the
1697 * application. As an async write fails we record the error code for later if
1698 * the app does an fsync. As long as errors persist we force future rpcs to be
1699 * sync so that the app can get a sync error and break the cycle of queueing
1700 * pages for which writeback will fail. */
1701 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1708 ar->ar_force_sync = 1;
1709 ar->ar_min_xid = ptlrpc_sample_next_xid();
1714 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1715 ar->ar_force_sync = 0;
1718 static void osc_oap_to_pending(struct osc_async_page *oap)
1720 struct loi_oap_pages *lop;
1722 if (oap->oap_cmd & OBD_BRW_WRITE)
1723 lop = &oap->oap_loi->loi_write_lop;
1725 lop = &oap->oap_loi->loi_read_lop;
1727 if (oap->oap_async_flags & ASYNC_URGENT)
1728 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1729 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1730 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1733 /* this must be called holding the loi list lock to give coverage to exit_cache,
1734 * async_flag maintenance, and oap_request */
1735 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1736 struct osc_async_page *oap, int sent, int rc)
1739 oap->oap_async_flags = 0;
1740 oap->oap_interrupted = 0;
1742 if (oap->oap_cmd & OBD_BRW_WRITE) {
1743 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1744 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1747 if (oap->oap_request != NULL) {
1748 ptlrpc_req_finished(oap->oap_request);
1749 oap->oap_request = NULL;
1752 if (rc == 0 && oa != NULL) {
1753 if (oa->o_valid & OBD_MD_FLBLOCKS)
1754 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1755 if (oa->o_valid & OBD_MD_FLMTIME)
1756 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1757 if (oa->o_valid & OBD_MD_FLATIME)
1758 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1759 if (oa->o_valid & OBD_MD_FLCTIME)
1760 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1764 osc_exit_cache(cli, oap, sent);
1765 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1766 oap->oap_oig = NULL;
1771 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1772 oap->oap_cmd, oa, rc);
1774 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1775 * I/O on the page could start, but OSC calls it under lock
1776 * and thus we can add oap back to pending safely */
1778 /* upper layer wants to leave the page on pending queue */
1779 osc_oap_to_pending(oap);
1781 osc_exit_cache(cli, oap, sent);
1785 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1787 struct osc_brw_async_args *aa = data;
1788 struct osc_async_page *oap, *tmp;
1789 struct client_obd *cli;
1792 rc = osc_brw_fini_request(request, rc);
1793 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1794 if (rc == -EAGAIN) {
1795 rc = osc_brw_redo_request(request, aa);
1803 client_obd_list_lock(&cli->cl_loi_list_lock);
1805 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1806 * is called so we know whether to go to sync BRWs or wait for more
1807 * RPCs to complete */
1808 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1809 cli->cl_w_in_flight--;
1811 cli->cl_r_in_flight--;
1813 /* the caller may re-use the oap after the completion call so
1814 * we need to clean it up a little */
1815 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1816 list_del_init(&oap->oap_rpc_item);
1817 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1820 osc_wake_cache_waiters(cli);
1821 osc_check_rpcs(cli);
1823 client_obd_list_unlock(&cli->cl_loi_list_lock);
1825 obdo_free(aa->aa_oa);
1829 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1833 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1834 struct list_head *rpc_list,
1835 int page_count, int cmd)
1837 struct ptlrpc_request *req;
1838 struct brw_page **pga = NULL;
1839 struct osc_brw_async_args *aa;
1840 struct obdo *oa = NULL;
1841 struct obd_async_page_ops *ops = NULL;
1842 void *caller_data = NULL;
1843 struct osc_async_page *oap;
1847 LASSERT(!list_empty(rpc_list));
1849 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1851 RETURN(ERR_PTR(-ENOMEM));
1855 GOTO(out, req = ERR_PTR(-ENOMEM));
1858 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1860 ops = oap->oap_caller_ops;
1861 caller_data = oap->oap_caller_data;
1863 pga[i] = &oap->oap_brw_page;
1864 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1865 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1866 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1870 /* always get the data for the obdo for the rpc */
1871 LASSERT(ops != NULL);
1872 ops->ap_fill_obdo(caller_data, cmd, oa);
1874 sort_brw_pages(pga, page_count);
1875 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1877 CERROR("prep_req failed: %d\n", rc);
1878 GOTO(out, req = ERR_PTR(rc));
1881 /* Need to update the timestamps after the request is built in case
1882 * we race with setattr (locally or in queue at OST). If OST gets
1883 * later setattr before earlier BRW (as determined by the request xid),
1884 * the OST will not use BRW timestamps. Sadly, there is no obvious
1885 * way to do this in a single call. bug 10150 */
1886 ops->ap_update_obdo(caller_data, cmd, oa,
1887 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1889 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1890 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1891 INIT_LIST_HEAD(&aa->aa_oaps);
1892 list_splice(rpc_list, &aa->aa_oaps);
1893 INIT_LIST_HEAD(rpc_list);
1900 OBD_FREE(pga, sizeof(*pga) * page_count);
1905 /* the loi lock is held across this function but it's allowed to release
1906 * and reacquire it during its work */
1907 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1908 int cmd, struct loi_oap_pages *lop)
1910 struct ptlrpc_request *req;
1911 obd_count page_count = 0;
1912 struct osc_async_page *oap = NULL, *tmp;
1913 struct osc_brw_async_args *aa;
1914 struct obd_async_page_ops *ops;
1915 CFS_LIST_HEAD(rpc_list);
1916 unsigned int ending_offset;
1917 unsigned starting_offset = 0;
1920 /* first we find the pages we're allowed to work with */
1921 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1922 ops = oap->oap_caller_ops;
1924 LASSERT(oap->oap_magic == OAP_MAGIC);
1926 /* in llite being 'ready' equates to the page being locked
1927 * until completion unlocks it. commit_write submits a page
1928 * as not ready because its unlock will happen unconditionally
1929 * as the call returns. if we race with commit_write giving
1930 * us that page we dont' want to create a hole in the page
1931 * stream, so we stop and leave the rpc to be fired by
1932 * another dirtier or kupdated interval (the not ready page
1933 * will still be on the dirty list). we could call in
1934 * at the end of ll_file_write to process the queue again. */
1935 if (!(oap->oap_async_flags & ASYNC_READY)) {
1936 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1938 CDEBUG(D_INODE, "oap %p page %p returned %d "
1939 "instead of ready\n", oap,
1943 /* llite is telling us that the page is still
1944 * in commit_write and that we should try
1945 * and put it in an rpc again later. we
1946 * break out of the loop so we don't create
1947 * a hole in the sequence of pages in the rpc
1952 /* the io isn't needed.. tell the checks
1953 * below to complete the rpc with EINTR */
1954 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1955 oap->oap_count = -EINTR;
1958 oap->oap_async_flags |= ASYNC_READY;
1961 LASSERTF(0, "oap %p page %p returned %d "
1962 "from make_ready\n", oap,
1970 * Page submitted for IO has to be locked. Either by
1971 * ->ap_make_ready() or by higher layers.
1973 * XXX nikita: this assertion should be adjusted when lustre
1974 * starts using PG_writeback for pages being written out.
1976 #if defined(__KERNEL__) && defined(__LINUX__)
1977 LASSERT(PageLocked(oap->oap_page));
1979 /* If there is a gap at the start of this page, it can't merge
1980 * with any previous page, so we'll hand the network a
1981 * "fragmented" page array that it can't transfer in 1 RDMA */
1982 if (page_count != 0 && oap->oap_page_off != 0)
1985 /* take the page out of our book-keeping */
1986 list_del_init(&oap->oap_pending_item);
1987 lop_update_pending(cli, lop, cmd, -1);
1988 list_del_init(&oap->oap_urgent_item);
1990 if (page_count == 0)
1991 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1992 (PTLRPC_MAX_BRW_SIZE - 1);
1994 /* ask the caller for the size of the io as the rpc leaves. */
1995 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1997 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1998 if (oap->oap_count <= 0) {
1999 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2001 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2005 /* now put the page back in our accounting */
2006 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2007 if (++page_count >= cli->cl_max_pages_per_rpc)
2010 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2011 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2012 * have the same alignment as the initial writes that allocated
2013 * extents on the server. */
2014 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2015 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2016 if (ending_offset == 0)
2019 /* If there is a gap at the end of this page, it can't merge
2020 * with any subsequent pages, so we'll hand the network a
2021 * "fragmented" page array that it can't transfer in 1 RDMA */
2022 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2026 osc_wake_cache_waiters(cli);
2028 if (page_count == 0)
2031 loi_list_maint(cli, loi);
2033 client_obd_list_unlock(&cli->cl_loi_list_lock);
2035 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2037 /* this should happen rarely and is pretty bad, it makes the
2038 * pending list not follow the dirty order */
2039 client_obd_list_lock(&cli->cl_loi_list_lock);
2040 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2041 list_del_init(&oap->oap_rpc_item);
2043 /* queued sync pages can be torn down while the pages
2044 * were between the pending list and the rpc */
2045 if (oap->oap_interrupted) {
2046 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2047 osc_ap_completion(cli, NULL, oap, 0,
2051 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2053 loi_list_maint(cli, loi);
2054 RETURN(PTR_ERR(req));
2057 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2058 if (cmd == OBD_BRW_READ) {
2059 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2060 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2061 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2062 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2063 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2065 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2066 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2067 cli->cl_w_in_flight);
2068 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2069 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2070 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2073 client_obd_list_lock(&cli->cl_loi_list_lock);
2075 if (cmd == OBD_BRW_READ)
2076 cli->cl_r_in_flight++;
2078 cli->cl_w_in_flight++;
2080 /* queued sync pages can be torn down while the pages
2081 * were between the pending list and the rpc */
2083 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2084 /* only one oap gets a request reference */
2087 if (oap->oap_interrupted && !req->rq_intr) {
2088 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2090 ptlrpc_mark_interrupted(req);
2094 tmp->oap_request = ptlrpc_request_addref(req);
2096 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2097 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2099 req->rq_interpret_reply = brw_interpret_oap;
2100 ptlrpcd_add_req(req);
2104 #define LOI_DEBUG(LOI, STR, args...) \
2105 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2106 !list_empty(&(LOI)->loi_cli_item), \
2107 (LOI)->loi_write_lop.lop_num_pending, \
2108 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2109 (LOI)->loi_read_lop.lop_num_pending, \
2110 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2113 /* This is called by osc_check_rpcs() to find which objects have pages that
2114 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2115 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2118 /* first return all objects which we already know to have
2119 * pages ready to be stuffed into rpcs */
2120 if (!list_empty(&cli->cl_loi_ready_list))
2121 RETURN(list_entry(cli->cl_loi_ready_list.next,
2122 struct lov_oinfo, loi_cli_item));
2124 /* then if we have cache waiters, return all objects with queued
2125 * writes. This is especially important when many small files
2126 * have filled up the cache and not been fired into rpcs because
2127 * they don't pass the nr_pending/object threshhold */
2128 if (!list_empty(&cli->cl_cache_waiters) &&
2129 !list_empty(&cli->cl_loi_write_list))
2130 RETURN(list_entry(cli->cl_loi_write_list.next,
2131 struct lov_oinfo, loi_write_item));
2133 /* then return all queued objects when we have an invalid import
2134 * so that they get flushed */
2135 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2136 if (!list_empty(&cli->cl_loi_write_list))
2137 RETURN(list_entry(cli->cl_loi_write_list.next,
2138 struct lov_oinfo, loi_write_item));
2139 if (!list_empty(&cli->cl_loi_read_list))
2140 RETURN(list_entry(cli->cl_loi_read_list.next,
2141 struct lov_oinfo, loi_read_item));
2146 /* called with the loi list lock held */
2147 static void osc_check_rpcs(struct client_obd *cli)
2149 struct lov_oinfo *loi;
2150 int rc = 0, race_counter = 0;
2153 while ((loi = osc_next_loi(cli)) != NULL) {
2154 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2156 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2159 /* attempt some read/write balancing by alternating between
2160 * reads and writes in an object. The makes_rpc checks here
2161 * would be redundant if we were getting read/write work items
2162 * instead of objects. we don't want send_oap_rpc to drain a
2163 * partial read pending queue when we're given this object to
2164 * do io on writes while there are cache waiters */
2165 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2166 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2167 &loi->loi_write_lop);
2175 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2176 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2177 &loi->loi_read_lop);
2186 /* attempt some inter-object balancing by issueing rpcs
2187 * for each object in turn */
2188 if (!list_empty(&loi->loi_cli_item))
2189 list_del_init(&loi->loi_cli_item);
2190 if (!list_empty(&loi->loi_write_item))
2191 list_del_init(&loi->loi_write_item);
2192 if (!list_empty(&loi->loi_read_item))
2193 list_del_init(&loi->loi_read_item);
2195 loi_list_maint(cli, loi);
2197 /* send_oap_rpc fails with 0 when make_ready tells it to
2198 * back off. llite's make_ready does this when it tries
2199 * to lock a page queued for write that is already locked.
2200 * we want to try sending rpcs from many objects, but we
2201 * don't want to spin failing with 0. */
2202 if (race_counter == 10)
2208 /* we're trying to queue a page in the osc so we're subject to the
2209 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2210 * If the osc's queued pages are already at that limit, then we want to sleep
2211 * until there is space in the osc's queue for us. We also may be waiting for
2212 * write credits from the OST if there are RPCs in flight that may return some
2213 * before we fall back to sync writes.
2215 * We need this know our allocation was granted in the presence of signals */
2216 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2220 client_obd_list_lock(&cli->cl_loi_list_lock);
2221 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2222 client_obd_list_unlock(&cli->cl_loi_list_lock);
2226 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2227 * grant or cache space. */
2228 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2229 struct osc_async_page *oap)
2231 struct osc_cache_waiter ocw;
2232 struct l_wait_info lwi = { 0 };
2235 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2236 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2237 cli->cl_dirty_max, obd_max_dirty_pages,
2238 cli->cl_lost_grant, cli->cl_avail_grant);
2240 /* force the caller to try sync io. this can jump the list
2241 * of queued writes and create a discontiguous rpc stream */
2242 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2243 loi->loi_ar.ar_force_sync)
2246 /* Hopefully normal case - cache space and write credits available */
2247 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2248 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2249 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2250 /* account for ourselves */
2251 osc_consume_write_grant(cli, &oap->oap_brw_page);
2255 /* Make sure that there are write rpcs in flight to wait for. This
2256 * is a little silly as this object may not have any pending but
2257 * other objects sure might. */
2258 if (cli->cl_w_in_flight) {
2259 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2260 cfs_waitq_init(&ocw.ocw_waitq);
2264 loi_list_maint(cli, loi);
2265 osc_check_rpcs(cli);
2266 client_obd_list_unlock(&cli->cl_loi_list_lock);
2268 CDEBUG(D_CACHE, "sleeping for cache space\n");
2269 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2271 client_obd_list_lock(&cli->cl_loi_list_lock);
2272 if (!list_empty(&ocw.ocw_entry)) {
2273 list_del(&ocw.ocw_entry);
2282 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2283 struct lov_oinfo *loi, cfs_page_t *page,
2284 obd_off offset, struct obd_async_page_ops *ops,
2285 void *data, void **res)
2287 struct osc_async_page *oap;
2291 return size_round(sizeof(*oap));
2294 oap->oap_magic = OAP_MAGIC;
2295 oap->oap_cli = &exp->exp_obd->u.cli;
2298 oap->oap_caller_ops = ops;
2299 oap->oap_caller_data = data;
2301 oap->oap_page = page;
2302 oap->oap_obj_off = offset;
2304 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2305 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2306 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2308 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2310 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2314 struct osc_async_page *oap_from_cookie(void *cookie)
2316 struct osc_async_page *oap = cookie;
2317 if (oap->oap_magic != OAP_MAGIC)
2318 return ERR_PTR(-EINVAL);
2322 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2323 struct lov_oinfo *loi, void *cookie,
2324 int cmd, obd_off off, int count,
2325 obd_flag brw_flags, enum async_flags async_flags)
2327 struct client_obd *cli = &exp->exp_obd->u.cli;
2328 struct osc_async_page *oap;
2332 oap = oap_from_cookie(cookie);
2334 RETURN(PTR_ERR(oap));
2336 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2339 if (!list_empty(&oap->oap_pending_item) ||
2340 !list_empty(&oap->oap_urgent_item) ||
2341 !list_empty(&oap->oap_rpc_item))
2344 /* check if the file's owner/group is over quota */
2345 #ifdef HAVE_QUOTA_SUPPORT
2346 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2347 struct obd_async_page_ops *ops;
2354 ops = oap->oap_caller_ops;
2355 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2356 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2367 loi = lsm->lsm_oinfo[0];
2369 client_obd_list_lock(&cli->cl_loi_list_lock);
2372 oap->oap_page_off = off;
2373 oap->oap_count = count;
2374 oap->oap_brw_flags = brw_flags;
2375 oap->oap_async_flags = async_flags;
2377 if (cmd & OBD_BRW_WRITE) {
2378 rc = osc_enter_cache(cli, loi, oap);
2380 client_obd_list_unlock(&cli->cl_loi_list_lock);
2385 osc_oap_to_pending(oap);
2386 loi_list_maint(cli, loi);
2388 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2391 osc_check_rpcs(cli);
2392 client_obd_list_unlock(&cli->cl_loi_list_lock);
2397 /* aka (~was & now & flag), but this is more clear :) */
2398 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2400 static int osc_set_async_flags(struct obd_export *exp,
2401 struct lov_stripe_md *lsm,
2402 struct lov_oinfo *loi, void *cookie,
2403 obd_flag async_flags)
2405 struct client_obd *cli = &exp->exp_obd->u.cli;
2406 struct loi_oap_pages *lop;
2407 struct osc_async_page *oap;
2411 oap = oap_from_cookie(cookie);
2413 RETURN(PTR_ERR(oap));
2416 * bug 7311: OST-side locking is only supported for liblustre for now
2417 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2418 * implementation has to handle case where OST-locked page was picked
2419 * up by, e.g., ->writepage().
2421 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2422 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2425 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2429 loi = lsm->lsm_oinfo[0];
2431 if (oap->oap_cmd & OBD_BRW_WRITE) {
2432 lop = &loi->loi_write_lop;
2434 lop = &loi->loi_read_lop;
2437 client_obd_list_lock(&cli->cl_loi_list_lock);
2439 if (list_empty(&oap->oap_pending_item))
2440 GOTO(out, rc = -EINVAL);
2442 if ((oap->oap_async_flags & async_flags) == async_flags)
2445 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2446 oap->oap_async_flags |= ASYNC_READY;
2448 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2449 if (list_empty(&oap->oap_rpc_item)) {
2450 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2451 loi_list_maint(cli, loi);
2455 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2456 oap->oap_async_flags);
2458 osc_check_rpcs(cli);
2459 client_obd_list_unlock(&cli->cl_loi_list_lock);
2463 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2464 struct lov_oinfo *loi,
2465 struct obd_io_group *oig, void *cookie,
2466 int cmd, obd_off off, int count,
2468 obd_flag async_flags)
2470 struct client_obd *cli = &exp->exp_obd->u.cli;
2471 struct osc_async_page *oap;
2472 struct loi_oap_pages *lop;
2476 oap = oap_from_cookie(cookie);
2478 RETURN(PTR_ERR(oap));
2480 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2483 if (!list_empty(&oap->oap_pending_item) ||
2484 !list_empty(&oap->oap_urgent_item) ||
2485 !list_empty(&oap->oap_rpc_item))
2489 loi = lsm->lsm_oinfo[0];
2491 client_obd_list_lock(&cli->cl_loi_list_lock);
2494 oap->oap_page_off = off;
2495 oap->oap_count = count;
2496 oap->oap_brw_flags = brw_flags;
2497 oap->oap_async_flags = async_flags;
2499 if (cmd & OBD_BRW_WRITE)
2500 lop = &loi->loi_write_lop;
2502 lop = &loi->loi_read_lop;
2504 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2505 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2507 rc = oig_add_one(oig, &oap->oap_occ);
2510 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2511 oap, oap->oap_page, rc);
2513 client_obd_list_unlock(&cli->cl_loi_list_lock);
2518 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2519 struct loi_oap_pages *lop, int cmd)
2521 struct list_head *pos, *tmp;
2522 struct osc_async_page *oap;
2524 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2525 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2526 list_del(&oap->oap_pending_item);
2527 osc_oap_to_pending(oap);
2529 loi_list_maint(cli, loi);
2532 static int osc_trigger_group_io(struct obd_export *exp,
2533 struct lov_stripe_md *lsm,
2534 struct lov_oinfo *loi,
2535 struct obd_io_group *oig)
2537 struct client_obd *cli = &exp->exp_obd->u.cli;
2541 loi = lsm->lsm_oinfo[0];
2543 client_obd_list_lock(&cli->cl_loi_list_lock);
2545 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2546 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2548 osc_check_rpcs(cli);
2549 client_obd_list_unlock(&cli->cl_loi_list_lock);
2554 static int osc_teardown_async_page(struct obd_export *exp,
2555 struct lov_stripe_md *lsm,
2556 struct lov_oinfo *loi, void *cookie)
2558 struct client_obd *cli = &exp->exp_obd->u.cli;
2559 struct loi_oap_pages *lop;
2560 struct osc_async_page *oap;
2564 oap = oap_from_cookie(cookie);
2566 RETURN(PTR_ERR(oap));
2569 loi = lsm->lsm_oinfo[0];
2571 if (oap->oap_cmd & OBD_BRW_WRITE) {
2572 lop = &loi->loi_write_lop;
2574 lop = &loi->loi_read_lop;
2577 client_obd_list_lock(&cli->cl_loi_list_lock);
2579 if (!list_empty(&oap->oap_rpc_item))
2580 GOTO(out, rc = -EBUSY);
2582 osc_exit_cache(cli, oap, 0);
2583 osc_wake_cache_waiters(cli);
2585 if (!list_empty(&oap->oap_urgent_item)) {
2586 list_del_init(&oap->oap_urgent_item);
2587 oap->oap_async_flags &= ~ASYNC_URGENT;
2589 if (!list_empty(&oap->oap_pending_item)) {
2590 list_del_init(&oap->oap_pending_item);
2591 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2593 loi_list_maint(cli, loi);
2595 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2597 client_obd_list_unlock(&cli->cl_loi_list_lock);
2601 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2604 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2607 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2610 lock_res_and_lock(lock);
2613 /* Liang XXX: Darwin and Winnt checking should be added */
2614 if (lock->l_ast_data && lock->l_ast_data != data) {
2615 struct inode *new_inode = data;
2616 struct inode *old_inode = lock->l_ast_data;
2617 if (!(old_inode->i_state & I_FREEING))
2618 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2619 LASSERTF(old_inode->i_state & I_FREEING,
2620 "Found existing inode %p/%lu/%u state %lu in lock: "
2621 "setting data to %p/%lu/%u\n", old_inode,
2622 old_inode->i_ino, old_inode->i_generation,
2624 new_inode, new_inode->i_ino, new_inode->i_generation);
2628 lock->l_ast_data = data;
2629 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2630 unlock_res_and_lock(lock);
2631 LDLM_LOCK_PUT(lock);
2634 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2635 ldlm_iterator_t replace, void *data)
2637 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2638 struct obd_device *obd = class_exp2obd(exp);
2640 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2644 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2650 /* The request was created before ldlm_cli_enqueue call. */
2651 if (rc == ELDLM_LOCK_ABORTED) {
2652 struct ldlm_reply *rep;
2654 /* swabbed by ldlm_cli_enqueue() */
2655 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2656 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2658 LASSERT(rep != NULL);
2659 if (rep->lock_policy_res1)
2660 rc = rep->lock_policy_res1;
2664 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2665 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2666 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2667 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2668 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2671 /* Call the update callback. */
2672 rc = oinfo->oi_cb_up(oinfo, rc);
2676 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2677 struct osc_enqueue_args *aa, int rc)
2679 int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2680 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2681 struct ldlm_lock *lock;
2683 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2685 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2687 /* Complete obtaining the lock procedure. */
2688 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2690 &aa->oa_ei->ei_flags,
2691 &lsm->lsm_oinfo[0]->loi_lvb,
2692 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2693 lustre_swab_ost_lvb,
2694 aa->oa_oi->oi_lockh, rc);
2696 /* Complete osc stuff. */
2697 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2699 /* Release the lock for async request. */
2700 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2701 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2703 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2704 aa->oa_oi->oi_lockh, req, aa);
2705 LDLM_LOCK_PUT(lock);
2709 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2710 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2711 * other synchronous requests, however keeping some locks and trying to obtain
2712 * others may take a considerable amount of time in a case of ost failure; and
2713 * when other sync requests do not get released lock from a client, the client
2714 * is excluded from the cluster -- such scenarious make the life difficult, so
2715 * release locks just after they are obtained. */
2716 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2717 struct obd_enqueue_info *einfo)
2719 struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2720 struct obd_device *obd = exp->exp_obd;
2721 struct ldlm_reply *rep;
2722 struct ptlrpc_request *req = NULL;
2723 int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2727 /* Filesystem lock extents are extended to page boundaries so that
2728 * dealing with the page cache is a little smoother. */
2729 oinfo->oi_policy.l_extent.start -=
2730 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2731 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2733 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2736 /* Next, search for already existing extent locks that will cover us */
2737 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2738 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2741 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2744 /* I would like to be able to ASSERT here that rss <=
2745 * kms, but I can't, for reasons which are explained in
2749 /* We already have a lock, and it's referenced */
2750 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2752 /* For async requests, decref the lock. */
2753 if (einfo->ei_rqset)
2754 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2759 /* If we're trying to read, we also search for an existing PW lock. The
2760 * VFS and page cache already protect us locally, so lots of readers/
2761 * writers can share a single PW lock.
2763 * There are problems with conversion deadlocks, so instead of
2764 * converting a read lock to a write lock, we'll just enqueue a new
2767 * At some point we should cancel the read lock instead of making them
2768 * send us a blocking callback, but there are problems with canceling
2769 * locks out from other users right now, too. */
2771 if (einfo->ei_mode == LCK_PR) {
2772 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY,
2773 &res_id, einfo->ei_type, &oinfo->oi_policy,
2774 LCK_PW, oinfo->oi_lockh);
2776 /* FIXME: This is not incredibly elegant, but it might
2777 * be more elegant than adding another parameter to
2778 * lock_match. I want a second opinion. */
2779 /* addref the lock only if not async requests. */
2780 if (!einfo->ei_rqset)
2781 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2782 osc_set_data_with_check(oinfo->oi_lockh,
2785 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2786 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2794 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2795 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
2797 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2798 LDLM_ENQUEUE, 2, size, NULL);
2802 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2803 size[DLM_REPLY_REC_OFF] =
2804 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2805 ptlrpc_req_set_repsize(req, 3, size);
2808 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2809 einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2811 rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2812 &oinfo->oi_policy, einfo->ei_mode,
2813 &einfo->ei_flags, einfo->ei_cb_bl,
2814 einfo->ei_cb_cp, einfo->ei_cb_gl,
2816 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2817 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2818 lustre_swab_ost_lvb, oinfo->oi_lockh,
2819 einfo->ei_rqset ? 1 : 0);
2820 if (einfo->ei_rqset) {
2822 struct osc_enqueue_args *aa;
2823 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2824 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2829 req->rq_interpret_reply = osc_enqueue_interpret;
2830 ptlrpc_set_add_req(einfo->ei_rqset, req);
2831 } else if (intent) {
2832 ptlrpc_req_finished(req);
2837 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2839 ptlrpc_req_finished(req);
2844 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2845 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2846 int *flags, void *data, struct lustre_handle *lockh)
2848 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2849 struct obd_device *obd = exp->exp_obd;
2851 int lflags = *flags;
2854 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2856 /* Filesystem lock extents are extended to page boundaries so that
2857 * dealing with the page cache is a little smoother */
2858 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2859 policy->l_extent.end |= ~CFS_PAGE_MASK;
2861 /* Next, search for already existing extent locks that will cover us */
2862 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2863 policy, mode, lockh);
2865 //if (!(*flags & LDLM_FL_TEST_LOCK))
2866 osc_set_data_with_check(lockh, data, lflags);
2869 /* If we're trying to read, we also search for an existing PW lock. The
2870 * VFS and page cache already protect us locally, so lots of readers/
2871 * writers can share a single PW lock. */
2872 if (mode == LCK_PR) {
2873 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2875 policy, LCK_PW, lockh);
2876 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2877 /* FIXME: This is not incredibly elegant, but it might
2878 * be more elegant than adding another parameter to
2879 * lock_match. I want a second opinion. */
2880 osc_set_data_with_check(lockh, data, lflags);
2881 ldlm_lock_addref(lockh, LCK_PR);
2882 ldlm_lock_decref(lockh, LCK_PW);
2888 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2889 __u32 mode, struct lustre_handle *lockh)
2893 if (unlikely(mode == LCK_GROUP))
2894 ldlm_lock_decref_and_cancel(lockh, mode);
2896 ldlm_lock_decref(lockh, mode);
2901 static int osc_cancel_unused(struct obd_export *exp,
2902 struct lov_stripe_md *lsm, int flags, void *opaque)
2904 struct obd_device *obd = class_exp2obd(exp);
2905 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2907 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2911 static int osc_join_lru(struct obd_export *exp,
2912 struct lov_stripe_md *lsm, int join)
2914 struct obd_device *obd = class_exp2obd(exp);
2915 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2917 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2920 static int osc_statfs_interpret(struct ptlrpc_request *req,
2921 struct osc_async_args *aa, int rc)
2923 struct obd_statfs *msfs;
2929 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2930 lustre_swab_obd_statfs);
2932 CERROR("Can't unpack obd_statfs\n");
2933 GOTO(out, rc = -EPROTO);
2936 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2938 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2942 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2943 __u64 max_age, struct ptlrpc_request_set *rqset)
2945 struct ptlrpc_request *req;
2946 struct osc_async_args *aa;
2947 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2950 /* We could possibly pass max_age in the request (as an absolute
2951 * timestamp or a "seconds.usec ago") so the target can avoid doing
2952 * extra calls into the filesystem if that isn't necessary (e.g.
2953 * during mount that would help a bit). Having relative timestamps
2954 * is not so great if request processing is slow, while absolute
2955 * timestamps are not ideal because they need time synchronization. */
2956 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2957 OST_STATFS, 1, NULL, NULL);
2961 ptlrpc_req_set_repsize(req, 2, size);
2962 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2964 req->rq_interpret_reply = osc_statfs_interpret;
2965 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2966 aa = (struct osc_async_args *)&req->rq_async_args;
2969 ptlrpc_set_add_req(rqset, req);
2973 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2976 struct obd_statfs *msfs;
2977 struct ptlrpc_request *req;
2978 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2981 /* We could possibly pass max_age in the request (as an absolute
2982 * timestamp or a "seconds.usec ago") so the target can avoid doing
2983 * extra calls into the filesystem if that isn't necessary (e.g.
2984 * during mount that would help a bit). Having relative timestamps
2985 * is not so great if request processing is slow, while absolute
2986 * timestamps are not ideal because they need time synchronization. */
2987 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2988 OST_STATFS, 1, NULL, NULL);
2992 ptlrpc_req_set_repsize(req, 2, size);
2993 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2995 rc = ptlrpc_queue_wait(req);
2999 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3000 lustre_swab_obd_statfs);
3002 CERROR("Can't unpack obd_statfs\n");
3003 GOTO(out, rc = -EPROTO);
3006 memcpy(osfs, msfs, sizeof(*osfs));
3010 ptlrpc_req_finished(req);
3014 /* Retrieve object striping information.
3016 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3017 * the maximum number of OST indices which will fit in the user buffer.
3018 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3020 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3022 struct lov_user_md lum, *lumk;
3023 int rc = 0, lum_size;
3029 if (copy_from_user(&lum, lump, sizeof(lum)))
3032 if (lum.lmm_magic != LOV_USER_MAGIC)
3035 if (lum.lmm_stripe_count > 0) {
3036 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3037 OBD_ALLOC(lumk, lum_size);
3041 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3043 lum_size = sizeof(lum);
3047 lumk->lmm_object_id = lsm->lsm_object_id;
3048 lumk->lmm_stripe_count = 1;
3050 if (copy_to_user(lump, lumk, lum_size))
3054 OBD_FREE(lumk, lum_size);
3060 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3061 void *karg, void *uarg)
3063 struct obd_device *obd = exp->exp_obd;
3064 struct obd_ioctl_data *data = karg;
3068 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3071 if (!try_module_get(THIS_MODULE)) {
3072 CERROR("Can't get module. Is it alive?");
3077 case OBD_IOC_LOV_GET_CONFIG: {
3079 struct lov_desc *desc;
3080 struct obd_uuid uuid;
3084 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3085 GOTO(out, err = -EINVAL);
3087 data = (struct obd_ioctl_data *)buf;
3089 if (sizeof(*desc) > data->ioc_inllen1) {
3090 obd_ioctl_freedata(buf, len);
3091 GOTO(out, err = -EINVAL);
3094 if (data->ioc_inllen2 < sizeof(uuid)) {
3095 obd_ioctl_freedata(buf, len);
3096 GOTO(out, err = -EINVAL);
3099 desc = (struct lov_desc *)data->ioc_inlbuf1;
3100 desc->ld_tgt_count = 1;
3101 desc->ld_active_tgt_count = 1;
3102 desc->ld_default_stripe_count = 1;
3103 desc->ld_default_stripe_size = 0;
3104 desc->ld_default_stripe_offset = 0;
3105 desc->ld_pattern = 0;
3106 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3108 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3110 err = copy_to_user((void *)uarg, buf, len);
3113 obd_ioctl_freedata(buf, len);
3116 case LL_IOC_LOV_SETSTRIPE:
3117 err = obd_alloc_memmd(exp, karg);
3121 case LL_IOC_LOV_GETSTRIPE:
3122 err = osc_getstripe(karg, uarg);
3124 case OBD_IOC_CLIENT_RECOVER:
3125 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3130 case IOC_OSC_SET_ACTIVE:
3131 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3134 case OBD_IOC_POLL_QUOTACHECK:
3135 err = lquota_poll_check(quota_interface, exp,
3136 (struct if_quotacheck *)karg);
3139 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3140 cmd, cfs_curproc_comm());
3141 GOTO(out, err = -ENOTTY);
3144 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3147 module_put(THIS_MODULE);
3152 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3153 void *key, __u32 *vallen, void *val)
3156 if (!vallen || !val)
3159 if (keylen > strlen("lock_to_stripe") &&
3160 strcmp(key, "lock_to_stripe") == 0) {
3161 __u32 *stripe = val;
3162 *vallen = sizeof(*stripe);
3165 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3166 struct ptlrpc_request *req;
3168 char *bufs[2] = { NULL, key };
3169 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3171 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3172 OST_GET_INFO, 2, size, bufs);
3176 size[REPLY_REC_OFF] = *vallen;
3177 ptlrpc_req_set_repsize(req, 2, size);
3178 rc = ptlrpc_queue_wait(req);
3182 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3183 lustre_swab_ost_last_id);
3184 if (reply == NULL) {
3185 CERROR("Can't unpack OST last ID\n");
3186 GOTO(out, rc = -EPROTO);
3188 *((obd_id *)val) = *reply;
3190 ptlrpc_req_finished(req);
3196 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3199 struct llog_ctxt *ctxt;
3200 struct obd_import *imp = req->rq_import;
3206 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3209 rc = llog_initiator_connect(ctxt);
3211 CERROR("cannot establish connection for "
3212 "ctxt %p: %d\n", ctxt, rc);
3215 imp->imp_server_timeout = 1;
3216 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3217 imp->imp_pingable = 1;
3222 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3223 void *key, obd_count vallen, void *val,
3224 struct ptlrpc_request_set *set)
3226 struct ptlrpc_request *req;
3227 struct obd_device *obd = exp->exp_obd;
3228 struct obd_import *imp = class_exp2cliimp(exp);
3229 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3230 char *bufs[3] = { NULL, key, val };
3233 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3235 if (KEY_IS(KEY_NEXT_ID)) {
3236 if (vallen != sizeof(obd_id))
3238 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3239 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3240 exp->exp_obd->obd_name,
3241 obd->u.cli.cl_oscc.oscc_next_id);
3246 if (KEY_IS("unlinked")) {
3247 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3248 spin_lock(&oscc->oscc_lock);
3249 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3250 spin_unlock(&oscc->oscc_lock);
3254 if (KEY_IS(KEY_INIT_RECOV)) {
3255 if (vallen != sizeof(int))
3257 imp->imp_initial_recov = *(int *)val;
3258 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3259 exp->exp_obd->obd_name,
3260 imp->imp_initial_recov);
3264 if (KEY_IS("checksum")) {
3265 if (vallen != sizeof(int))
3267 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3274 /* We pass all other commands directly to OST. Since nobody calls osc
3275 methods directly and everybody is supposed to go through LOV, we
3276 assume lov checked invalid values for us.
3277 The only recognised values so far are evict_by_nid and mds_conn.
3278 Even if something bad goes through, we'd get a -EINVAL from OST
3281 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3286 if (KEY_IS("mds_conn"))
3287 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3289 ptlrpc_req_set_repsize(req, 1, NULL);
3290 ptlrpc_set_add_req(set, req);
3291 ptlrpc_check_set(set);
3297 static struct llog_operations osc_size_repl_logops = {
3298 lop_cancel: llog_obd_repl_cancel
3301 static struct llog_operations osc_mds_ost_orig_logops;
3302 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3303 int count, struct llog_catid *catid,
3304 struct obd_uuid *uuid)
3309 spin_lock(&obd->obd_dev_lock);
3310 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3311 osc_mds_ost_orig_logops = llog_lvfs_ops;
3312 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3313 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3314 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3315 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3317 spin_unlock(&obd->obd_dev_lock);
3319 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3320 &catid->lci_logid, &osc_mds_ost_orig_logops);
3322 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3326 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3327 &osc_size_repl_logops);
3329 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3332 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3333 obd->obd_name, tgt->obd_name, count, catid, rc);
3334 CERROR("logid "LPX64":0x%x\n",
3335 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3340 static int osc_llog_finish(struct obd_device *obd, int count)
3342 struct llog_ctxt *ctxt;
3343 int rc = 0, rc2 = 0;
3346 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3348 rc = llog_cleanup(ctxt);
3350 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3352 rc2 = llog_cleanup(ctxt);
3359 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3360 struct obd_uuid *cluuid,
3361 struct obd_connect_data *data)
3363 struct client_obd *cli = &obd->u.cli;
3365 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3368 client_obd_list_lock(&cli->cl_loi_list_lock);
3369 data->ocd_grant = cli->cl_avail_grant ?:
3370 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3371 lost_grant = cli->cl_lost_grant;
3372 cli->cl_lost_grant = 0;
3373 client_obd_list_unlock(&cli->cl_loi_list_lock);
3375 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3376 "cl_lost_grant: %ld\n", data->ocd_grant,
3377 cli->cl_avail_grant, lost_grant);
3378 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3379 " ocd_grant: %d\n", data->ocd_connect_flags,
3380 data->ocd_version, data->ocd_grant);
3386 static int osc_disconnect(struct obd_export *exp)
3388 struct obd_device *obd = class_exp2obd(exp);
3389 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3392 if (obd->u.cli.cl_conn_count == 1)
3393 /* flush any remaining cancel messages out to the target */
3394 llog_sync(ctxt, exp);
3396 rc = client_disconnect_export(exp);
3400 static int osc_import_event(struct obd_device *obd,
3401 struct obd_import *imp,
3402 enum obd_import_event event)
3404 struct client_obd *cli;
3408 LASSERT(imp->imp_obd == obd);
3411 case IMP_EVENT_DISCON: {
3412 /* Only do this on the MDS OSC's */
3413 if (imp->imp_server_timeout) {
3414 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3416 spin_lock(&oscc->oscc_lock);
3417 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3418 spin_unlock(&oscc->oscc_lock);
3423 case IMP_EVENT_INACTIVE: {
3424 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3427 case IMP_EVENT_INVALIDATE: {
3428 struct ldlm_namespace *ns = obd->obd_namespace;
3432 client_obd_list_lock(&cli->cl_loi_list_lock);
3433 cli->cl_avail_grant = 0;
3434 cli->cl_lost_grant = 0;
3435 /* all pages go to failing rpcs due to the invalid import */
3436 osc_check_rpcs(cli);
3437 client_obd_list_unlock(&cli->cl_loi_list_lock);
3439 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3443 case IMP_EVENT_ACTIVE: {
3444 /* Only do this on the MDS OSC's */
3445 if (imp->imp_server_timeout) {
3446 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3448 spin_lock(&oscc->oscc_lock);
3449 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3450 spin_unlock(&oscc->oscc_lock);
3452 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3455 case IMP_EVENT_OCD: {
3456 struct obd_connect_data *ocd = &imp->imp_connect_data;
3458 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3459 osc_init_grant(&obd->u.cli, ocd);
3462 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3463 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3465 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3469 CERROR("Unknown import event %d\n", event);
3475 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3481 rc = ptlrpcd_addref();
3485 rc = client_obd_setup(obd, len, buf);
3489 struct lprocfs_static_vars lvars;
3490 struct client_obd *cli = &obd->u.cli;
3492 lprocfs_init_vars(osc, &lvars);
3493 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3494 lproc_osc_attach_seqstat(obd);
3495 ptlrpc_lprocfs_register_obd(obd);
3499 /* We need to allocate a few requests more, because
3500 brw_interpret_oap tries to create new requests before freeing
3501 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3502 reserved, but I afraid that might be too much wasted RAM
3503 in fact, so 2 is just my guess and still should work. */
3504 cli->cl_import->imp_rq_pool =
3505 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3507 ptlrpc_add_rqs_to_pool);
3513 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3519 case OBD_CLEANUP_EARLY: {
3520 struct obd_import *imp;
3521 imp = obd->u.cli.cl_import;
3522 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3523 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3524 ptlrpc_deactivate_import(imp);
3527 case OBD_CLEANUP_EXPORTS: {
3528 /* If we set up but never connected, the
3529 client import will not have been cleaned. */
3530 if (obd->u.cli.cl_import) {
3531 struct obd_import *imp;
3532 imp = obd->u.cli.cl_import;
3533 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3535 ptlrpc_invalidate_import(imp);
3536 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3537 class_destroy_import(imp);
3538 obd->u.cli.cl_import = NULL;
3542 case OBD_CLEANUP_SELF_EXP:
3543 rc = obd_llog_finish(obd, 0);
3545 CERROR("failed to cleanup llogging subsystems\n");
3547 case OBD_CLEANUP_OBD:
3553 int osc_cleanup(struct obd_device *obd)
3555 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3559 ptlrpc_lprocfs_unregister_obd(obd);
3560 lprocfs_obd_cleanup(obd);
3562 spin_lock(&oscc->oscc_lock);
3563 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3564 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3565 spin_unlock(&oscc->oscc_lock);
3567 /* free memory of osc quota cache */
3568 lquota_cleanup(quota_interface, obd);
3570 rc = client_obd_cleanup(obd);
3576 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3578 struct lustre_cfg *lcfg = buf;
3579 struct lprocfs_static_vars lvars;
3582 lprocfs_init_vars(osc, &lvars);
3584 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3588 struct obd_ops osc_obd_ops = {
3589 .o_owner = THIS_MODULE,
3590 .o_setup = osc_setup,
3591 .o_precleanup = osc_precleanup,
3592 .o_cleanup = osc_cleanup,
3593 .o_add_conn = client_import_add_conn,
3594 .o_del_conn = client_import_del_conn,
3595 .o_connect = client_connect_import,
3596 .o_reconnect = osc_reconnect,
3597 .o_disconnect = osc_disconnect,
3598 .o_statfs = osc_statfs,
3599 .o_statfs_async = osc_statfs_async,
3600 .o_packmd = osc_packmd,
3601 .o_unpackmd = osc_unpackmd,
3602 .o_create = osc_create,
3603 .o_destroy = osc_destroy,
3604 .o_getattr = osc_getattr,
3605 .o_getattr_async = osc_getattr_async,
3606 .o_setattr = osc_setattr,
3607 .o_setattr_async = osc_setattr_async,
3609 .o_brw_async = osc_brw_async,
3610 .o_prep_async_page = osc_prep_async_page,
3611 .o_queue_async_io = osc_queue_async_io,
3612 .o_set_async_flags = osc_set_async_flags,
3613 .o_queue_group_io = osc_queue_group_io,
3614 .o_trigger_group_io = osc_trigger_group_io,
3615 .o_teardown_async_page = osc_teardown_async_page,
3616 .o_punch = osc_punch,
3618 .o_enqueue = osc_enqueue,
3619 .o_match = osc_match,
3620 .o_change_cbdata = osc_change_cbdata,
3621 .o_cancel = osc_cancel,
3622 .o_cancel_unused = osc_cancel_unused,
3623 .o_join_lru = osc_join_lru,
3624 .o_iocontrol = osc_iocontrol,
3625 .o_get_info = osc_get_info,
3626 .o_set_info_async = osc_set_info_async,
3627 .o_import_event = osc_import_event,
3628 .o_llog_init = osc_llog_init,
3629 .o_llog_finish = osc_llog_finish,
3630 .o_process_config = osc_process_config,
3633 int __init osc_init(void)
3635 struct lprocfs_static_vars lvars;
3639 lprocfs_init_vars(osc, &lvars);
3641 request_module("lquota");
3642 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3643 lquota_init(quota_interface);
3644 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3646 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3649 if (quota_interface)
3650 PORTAL_SYMBOL_PUT(osc_quota_interface);
3658 static void /*__exit*/ osc_exit(void)
3660 lquota_exit(quota_interface);
3661 if (quota_interface)
3662 PORTAL_SYMBOL_PUT(osc_quota_interface);
3664 class_unregister_type(LUSTRE_OSC_NAME);
3667 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3668 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3669 MODULE_LICENSE("GPL");
3671 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);