1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102 struct lov_mds_md *lmm, int lmm_bytes)
108 if (lmm_bytes < sizeof (*lmm)) {
109 CERROR("lov_mds_md too small: %d, need %d\n",
110 lmm_bytes, (int)sizeof(*lmm));
113 /* XXX LOV_MAGIC etc check? */
115 if (lmm->lmm_object_id == 0) {
116 CERROR("lov_mds_md: zero lmm_object_id\n");
121 lsm_size = lov_stripe_md_size(1);
125 if (*lsmp != NULL && lmm == NULL) {
126 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
127 OBD_FREE(*lsmp, lsm_size);
133 OBD_ALLOC(*lsmp, lsm_size);
136 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137 if ((*lsmp)->lsm_oinfo[0] == NULL) {
138 OBD_FREE(*lsmp, lsm_size);
141 loi_init((*lsmp)->lsm_oinfo[0]);
145 /* XXX zero *lsmp? */
146 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
147 LASSERT((*lsmp)->lsm_object_id);
150 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155 static int osc_getattr_interpret(struct ptlrpc_request *req,
156 struct osc_async_args *aa, int rc)
158 struct ost_body *body;
164 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
165 lustre_swab_ost_body);
167 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
168 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
170 /* This should really be sent by the OST */
171 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
172 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
174 CERROR("can't unpack ost_body\n");
176 aa->aa_oi->oi_oa->o_valid = 0;
179 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
183 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
184 struct ptlrpc_request_set *set)
186 struct ptlrpc_request *req;
187 struct ost_body *body;
188 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
189 struct osc_async_args *aa;
192 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
193 OST_GETATTR, 2, size,NULL);
197 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
198 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
200 ptlrpc_req_set_repsize(req, 2, size);
201 req->rq_interpret_reply = osc_getattr_interpret;
203 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
204 aa = (struct osc_async_args *)&req->rq_async_args;
207 ptlrpc_set_add_req(set, req);
211 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
213 struct ptlrpc_request *req;
214 struct ost_body *body;
215 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
218 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
219 OST_GETATTR, 2, size, NULL);
223 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
224 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
226 ptlrpc_req_set_repsize(req, 2, size);
228 rc = ptlrpc_queue_wait(req);
230 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
234 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
235 lustre_swab_ost_body);
237 CERROR ("can't unpack ost_body\n");
238 GOTO (out, rc = -EPROTO);
241 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
242 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
244 /* This should really be sent by the OST */
245 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
246 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
250 ptlrpc_req_finished(req);
254 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
255 struct obd_trans_info *oti)
257 struct ptlrpc_request *req;
258 struct ost_body *body;
259 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
262 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
263 OST_SETATTR, 2, size, NULL);
267 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
268 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
270 ptlrpc_req_set_repsize(req, 2, size);
272 rc = ptlrpc_queue_wait(req);
276 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
277 lustre_swab_ost_body);
279 GOTO(out, rc = -EPROTO);
281 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
285 ptlrpc_req_finished(req);
289 static int osc_setattr_interpret(struct ptlrpc_request *req,
290 struct osc_async_args *aa, int rc)
292 struct ost_body *body;
298 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
299 lustre_swab_ost_body);
301 CERROR("can't unpack ost_body\n");
302 GOTO(out, rc = -EPROTO);
305 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
307 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
311 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
312 struct obd_trans_info *oti,
313 struct ptlrpc_request_set *rqset)
315 struct ptlrpc_request *req;
316 struct ost_body *body;
317 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
318 struct osc_async_args *aa;
321 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
322 OST_SETATTR, 2, size, NULL);
326 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
328 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
330 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
331 sizeof(*oti->oti_logcookies));
334 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
335 ptlrpc_req_set_repsize(req, 2, size);
336 /* do mds to ost setattr asynchronouly */
338 /* Do not wait for response. */
339 ptlrpcd_add_req(req);
341 req->rq_interpret_reply = osc_setattr_interpret;
343 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
344 aa = (struct osc_async_args *)&req->rq_async_args;
347 ptlrpc_set_add_req(rqset, req);
353 int osc_real_create(struct obd_export *exp, struct obdo *oa,
354 struct lov_stripe_md **ea, struct obd_trans_info *oti)
356 struct ptlrpc_request *req;
357 struct ost_body *body;
358 struct lov_stripe_md *lsm;
359 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
367 rc = obd_alloc_memmd(exp, &lsm);
372 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
373 OST_CREATE, 2, size, NULL);
375 GOTO(out, rc = -ENOMEM);
377 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
378 memcpy(&body->oa, oa, sizeof(body->oa));
380 ptlrpc_req_set_repsize(req, 2, size);
381 if (oa->o_valid & OBD_MD_FLINLINE) {
382 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
383 oa->o_flags == OBD_FL_DELORPHAN);
385 "delorphan from OST integration");
386 /* Don't resend the delorphan req */
387 req->rq_no_resend = req->rq_no_delay = 1;
390 rc = ptlrpc_queue_wait(req);
394 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
395 lustre_swab_ost_body);
397 CERROR ("can't unpack ost_body\n");
398 GOTO (out_req, rc = -EPROTO);
401 memcpy(oa, &body->oa, sizeof(*oa));
403 /* This should really be sent by the OST */
404 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
405 oa->o_valid |= OBD_MD_FLBLKSZ;
407 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
408 * have valid lsm_oinfo data structs, so don't go touching that.
409 * This needs to be fixed in a big way.
411 lsm->lsm_object_id = oa->o_id;
415 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
417 if (oa->o_valid & OBD_MD_FLCOOKIE) {
418 if (!oti->oti_logcookies)
419 oti_alloc_cookies(oti, 1);
420 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
421 sizeof(oti->oti_onecookie));
425 CDEBUG(D_HA, "transno: "LPD64"\n",
426 lustre_msg_get_transno(req->rq_repmsg));
429 ptlrpc_req_finished(req);
432 obd_free_memmd(exp, &lsm);
436 static int osc_punch_interpret(struct ptlrpc_request *req,
437 struct osc_async_args *aa, int rc)
439 struct ost_body *body;
445 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
446 lustre_swab_ost_body);
448 CERROR ("can't unpack ost_body\n");
449 GOTO(out, rc = -EPROTO);
452 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
454 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
458 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
459 struct obd_trans_info *oti,
460 struct ptlrpc_request_set *rqset)
462 struct ptlrpc_request *req;
463 struct osc_async_args *aa;
464 struct ost_body *body;
465 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
473 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
474 OST_PUNCH, 2, size, NULL);
478 /* FIXME bug 249. Also see bug 7198 */
479 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
480 OBD_CONNECT_REQPORTAL)
481 req->rq_request_portal = OST_IO_PORTAL;
483 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
484 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
486 /* overload the size and blocks fields in the oa with start/end */
487 body->oa.o_size = oinfo->oi_policy.l_extent.start;
488 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
489 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
491 ptlrpc_req_set_repsize(req, 2, size);
493 req->rq_interpret_reply = osc_punch_interpret;
494 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
495 aa = (struct osc_async_args *)&req->rq_async_args;
497 ptlrpc_set_add_req(rqset, req);
502 static int osc_sync(struct obd_export *exp, struct obdo *oa,
503 struct lov_stripe_md *md, obd_size start, obd_size end)
505 struct ptlrpc_request *req;
506 struct ost_body *body;
507 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
515 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
516 OST_SYNC, 2, size, NULL);
520 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
521 memcpy(&body->oa, oa, sizeof(*oa));
523 /* overload the size and blocks fields in the oa with start/end */
524 body->oa.o_size = start;
525 body->oa.o_blocks = end;
526 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
528 ptlrpc_req_set_repsize(req, 2, size);
530 rc = ptlrpc_queue_wait(req);
534 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
535 lustre_swab_ost_body);
537 CERROR ("can't unpack ost_body\n");
538 GOTO (out, rc = -EPROTO);
541 memcpy(oa, &body->oa, sizeof(*oa));
545 ptlrpc_req_finished(req);
549 /* Destroy requests can be async always on the client, and we don't even really
550 * care about the return code since the client cannot do anything at all about
552 * When the MDS is unlinking a filename, it saves the file objects into a
553 * recovery llog, and these object records are cancelled when the OST reports
554 * they were destroyed and sync'd to disk (i.e. transaction committed).
555 * If the client dies, or the OST is down when the object should be destroyed,
556 * the records are not cancelled, and when the OST reconnects to the MDS next,
557 * it will retrieve the llog unlink logs and then sends the log cancellation
558 * cookies to the MDS after committing destroy transactions. */
559 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
560 struct lov_stripe_md *ea, struct obd_trans_info *oti,
561 struct obd_export *md_export)
563 struct ptlrpc_request *req;
564 struct ost_body *body;
565 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
573 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
574 OST_DESTROY, 2, size, NULL);
578 /* FIXME bug 249. Also see bug 7198 */
579 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
580 OBD_CONNECT_REQPORTAL)
581 req->rq_request_portal = OST_IO_PORTAL;
583 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
585 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
586 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
587 sizeof(*oti->oti_logcookies));
590 memcpy(&body->oa, oa, sizeof(*oa));
591 ptlrpc_req_set_repsize(req, 2, size);
593 ptlrpcd_add_req(req);
597 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
600 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
602 LASSERT(!(oa->o_valid & bits));
605 client_obd_list_lock(&cli->cl_loi_list_lock);
606 oa->o_dirty = cli->cl_dirty;
607 if (cli->cl_dirty > cli->cl_dirty_max) {
608 CERROR("dirty %lu > dirty_max %lu\n",
609 cli->cl_dirty, cli->cl_dirty_max);
611 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
612 CERROR("dirty %d > system dirty_max %d\n",
613 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
615 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
616 CERROR("dirty %lu - dirty_max %lu too big???\n",
617 cli->cl_dirty, cli->cl_dirty_max);
620 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
621 (cli->cl_max_rpcs_in_flight + 1);
622 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
624 oa->o_grant = cli->cl_avail_grant;
625 oa->o_dropped = cli->cl_lost_grant;
626 cli->cl_lost_grant = 0;
627 client_obd_list_unlock(&cli->cl_loi_list_lock);
628 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
629 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
632 /* caller must hold loi_list_lock */
633 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
635 atomic_inc(&obd_dirty_pages);
636 cli->cl_dirty += CFS_PAGE_SIZE;
637 cli->cl_avail_grant -= CFS_PAGE_SIZE;
638 pga->flag |= OBD_BRW_FROM_GRANT;
639 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
640 CFS_PAGE_SIZE, pga, pga->pg);
641 LASSERT(cli->cl_avail_grant >= 0);
644 /* the companion to osc_consume_write_grant, called when a brw has completed.
645 * must be called with the loi lock held. */
646 static void osc_release_write_grant(struct client_obd *cli,
647 struct brw_page *pga, int sent)
649 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
652 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
657 pga->flag &= ~OBD_BRW_FROM_GRANT;
658 atomic_dec(&obd_dirty_pages);
659 cli->cl_dirty -= CFS_PAGE_SIZE;
661 cli->cl_lost_grant += CFS_PAGE_SIZE;
662 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
663 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
664 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
665 /* For short writes we shouldn't count parts of pages that
666 * span a whole block on the OST side, or our accounting goes
667 * wrong. Should match the code in filter_grant_check. */
668 int offset = pga->off & ~CFS_PAGE_MASK;
669 int count = pga->count + (offset & (blocksize - 1));
670 int end = (offset + pga->count) & (blocksize - 1);
672 count += blocksize - end;
674 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
675 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
676 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
677 cli->cl_avail_grant, cli->cl_dirty);
683 static unsigned long rpcs_in_flight(struct client_obd *cli)
685 return cli->cl_r_in_flight + cli->cl_w_in_flight;
688 /* caller must hold loi_list_lock */
689 void osc_wake_cache_waiters(struct client_obd *cli)
691 struct list_head *l, *tmp;
692 struct osc_cache_waiter *ocw;
695 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
696 /* if we can't dirty more, we must wait until some is written */
697 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
698 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
699 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
700 "osc max %ld, sys max %d\n", cli->cl_dirty,
701 cli->cl_dirty_max, obd_max_dirty_pages);
705 /* if still dirty cache but no grant wait for pending RPCs that
706 * may yet return us some grant before doing sync writes */
707 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
708 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
709 cli->cl_w_in_flight);
713 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
714 list_del_init(&ocw->ocw_entry);
715 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
716 /* no more RPCs in flight to return grant, do sync IO */
717 ocw->ocw_rc = -EDQUOT;
718 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
720 osc_consume_write_grant(cli,
721 &ocw->ocw_oap->oap_brw_page);
724 cfs_waitq_signal(&ocw->ocw_waitq);
730 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
732 client_obd_list_lock(&cli->cl_loi_list_lock);
733 cli->cl_avail_grant = ocd->ocd_grant;
734 client_obd_list_unlock(&cli->cl_loi_list_lock);
736 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
737 cli->cl_avail_grant, cli->cl_lost_grant);
738 LASSERT(cli->cl_avail_grant >= 0);
741 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
743 client_obd_list_lock(&cli->cl_loi_list_lock);
744 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
745 cli->cl_avail_grant += body->oa.o_grant;
746 /* waiters are woken in brw_interpret_oap */
747 client_obd_list_unlock(&cli->cl_loi_list_lock);
750 /* We assume that the reason this OSC got a short read is because it read
751 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
752 * via the LOV, and it _knows_ it's reading inside the file, it's just that
753 * this stripe never got written at or beyond this stripe offset yet. */
754 static void handle_short_read(int nob_read, obd_count page_count,
755 struct brw_page **pga)
760 /* skip bytes read OK */
761 while (nob_read > 0) {
762 LASSERT (page_count > 0);
764 if (pga[i]->count > nob_read) {
765 /* EOF inside this page */
766 ptr = cfs_kmap(pga[i]->pg) +
767 (pga[i]->off & ~CFS_PAGE_MASK);
768 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
769 cfs_kunmap(pga[i]->pg);
775 nob_read -= pga[i]->count;
780 /* zero remaining pages */
781 while (page_count-- > 0) {
782 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
783 memset(ptr, 0, pga[i]->count);
784 cfs_kunmap(pga[i]->pg);
789 static int check_write_rcs(struct ptlrpc_request *req,
790 int requested_nob, int niocount,
791 obd_count page_count, struct brw_page **pga)
795 /* return error if any niobuf was in error */
796 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
797 sizeof(*remote_rcs) * niocount, NULL);
798 if (remote_rcs == NULL) {
799 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
802 if (lustre_msg_swabbed(req->rq_repmsg))
803 for (i = 0; i < niocount; i++)
804 __swab32s(&remote_rcs[i]);
806 for (i = 0; i < niocount; i++) {
807 if (remote_rcs[i] < 0)
808 return(remote_rcs[i]);
810 if (remote_rcs[i] != 0) {
811 CERROR("rc[%d] invalid (%d) req %p\n",
812 i, remote_rcs[i], req);
817 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
818 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
819 requested_nob, req->rq_bulk->bd_nob_transferred);
826 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
828 if (p1->flag != p2->flag) {
829 unsigned mask = ~OBD_BRW_FROM_GRANT;
831 /* warn if we try to combine flags that we don't know to be
833 if ((p1->flag & mask) != (p2->flag & mask))
834 CERROR("is it ok to have flags 0x%x and 0x%x in the "
835 "same brw?\n", p1->flag, p2->flag);
839 return (p1->off + p1->count == p2->off);
842 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
843 struct brw_page **pga)
848 LASSERT (pg_count > 0);
849 while (nob > 0 && pg_count > 0) {
850 char *ptr = cfs_kmap(pga[i]->pg);
851 int off = pga[i]->off & ~CFS_PAGE_MASK;
852 int count = pga[i]->count > nob ? nob : pga[i]->count;
854 /* corrupt the data before we compute the checksum, to
855 * simulate an OST->client data error */
856 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
857 memcpy(ptr + off, "bad1", min(4, nob));
858 cksum = crc32_le(cksum, ptr + off, count);
859 cfs_kunmap(pga[i]->pg);
860 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
863 nob -= pga[i]->count;
867 /* For sending we only compute the wrong checksum instead
868 * of corrupting the data so it is still correct on a redo */
869 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
875 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
876 struct lov_stripe_md *lsm, obd_count page_count,
877 struct brw_page **pga,
878 struct ptlrpc_request **reqp)
880 struct ptlrpc_request *req;
881 struct ptlrpc_bulk_desc *desc;
882 struct ost_body *body;
883 struct obd_ioobj *ioobj;
884 struct niobuf_remote *niobuf;
885 int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
886 int niocount, i, requested_nob, opc, rc;
887 struct ptlrpc_request_pool *pool;
888 struct osc_brw_async_args *aa;
891 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
892 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
894 for (niocount = i = 1; i < page_count; i++) {
895 if (!can_merge_pages(pga[i - 1], pga[i]))
899 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
900 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
902 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
903 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
908 /* FIXME bug 249. Also see bug 7198 */
909 if (cli->cl_import->imp_connect_data.ocd_connect_flags &
910 OBD_CONNECT_REQPORTAL)
911 req->rq_request_portal = OST_IO_PORTAL;
913 if (opc == OST_WRITE)
914 desc = ptlrpc_prep_bulk_imp (req, page_count,
915 BULK_GET_SOURCE, OST_BULK_PORTAL);
917 desc = ptlrpc_prep_bulk_imp (req, page_count,
918 BULK_PUT_SINK, OST_BULK_PORTAL);
920 GOTO(out, rc = -ENOMEM);
921 /* NB request now owns desc and will free it when it gets freed */
923 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
924 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
925 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
926 niocount * sizeof(*niobuf));
928 memcpy(&body->oa, oa, sizeof(*oa));
930 obdo_to_ioobj(oa, ioobj);
931 ioobj->ioo_bufcnt = niocount;
933 LASSERT (page_count > 0);
934 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
935 struct brw_page *pg = pga[i];
936 struct brw_page *pg_prev = pga[i - 1];
938 LASSERT(pg->count > 0);
939 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
940 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
943 LASSERTF(i == 0 || pg->off > pg_prev->off,
944 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
945 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
947 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
948 pg_prev->pg, page_private(pg_prev->pg),
949 pg_prev->pg->index, pg_prev->off);
951 LASSERTF(i == 0 || pg->off > pg_prev->off,
952 "i %d p_c %u\n", i, page_count);
954 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
955 (pg->flag & OBD_BRW_SRVLOCK));
957 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
959 requested_nob += pg->count;
961 if (i > 0 && can_merge_pages(pg_prev, pg)) {
963 niobuf->len += pg->count;
965 niobuf->offset = pg->off;
966 niobuf->len = pg->count;
967 niobuf->flags = pg->flag;
971 LASSERT((void *)(niobuf - niocount) ==
972 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
973 niocount * sizeof(*niobuf)));
974 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
976 /* size[REQ_REC_OFF] still sizeof (*body) */
977 if (opc == OST_WRITE) {
978 if (unlikely(cli->cl_checksum)) {
979 body->oa.o_valid |= OBD_MD_FLCKSUM;
980 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
982 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
984 /* save this in 'oa', too, for later checking */
985 oa->o_valid |= OBD_MD_FLCKSUM;
987 /* clear out the checksum flag, in case this is a
988 * resend but cl_checksum is no longer set. b=11238 */
989 oa->o_valid &= ~OBD_MD_FLCKSUM;
991 oa->o_cksum = body->oa.o_cksum;
992 /* 1 RC per niobuf */
993 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
994 ptlrpc_req_set_repsize(req, 3, size);
996 if (unlikely(cli->cl_checksum))
997 body->oa.o_valid |= OBD_MD_FLCKSUM;
998 /* 1 RC for the whole I/O */
999 ptlrpc_req_set_repsize(req, 2, size);
1002 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1003 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1005 aa->aa_requested_nob = requested_nob;
1006 aa->aa_nio_count = niocount;
1007 aa->aa_page_count = page_count;
1008 aa->aa_retries = 5; /*retry for checksum errors; lprocfs? */
1011 INIT_LIST_HEAD(&aa->aa_oaps);
1017 ptlrpc_req_finished (req);
1021 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1022 __u32 client_cksum, __u32 server_cksum, int nob,
1023 obd_count page_count, struct brw_page **pga)
1028 if (server_cksum == client_cksum) {
1029 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1033 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1035 if (new_cksum == server_cksum)
1036 msg = "changed on the client after we checksummed it";
1037 else if (new_cksum == client_cksum)
1038 msg = "changed in transit before arrival at OST";
1040 msg = "changed in transit AND doesn't match the original";
1042 LCONSOLE_ERROR("BAD WRITE CHECKSUM: %s: from %s inum "LPU64"/"LPU64
1043 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1044 msg, libcfs_nid2str(peer->nid),
1045 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1046 oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0,
1048 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1050 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1051 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1052 client_cksum, server_cksum, new_cksum);
1057 /* Note rc enters this function as number of bytes transferred */
1058 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1060 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1061 const lnet_process_id_t *peer =
1062 &req->rq_import->imp_connection->c_peer;
1063 struct client_obd *cli = aa->aa_cli;
1064 struct ost_body *body;
1065 __u32 client_cksum = 0;
1068 if (rc < 0 && rc != -EDQUOT)
1071 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1072 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1073 lustre_swab_ost_body);
1075 CERROR ("Can't unpack body\n");
1079 /* set/clear over quota flag for a uid/gid */
1080 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1081 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1082 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1083 body->oa.o_gid, body->oa.o_valid,
1089 if (req->rq_set && req->rq_set->set_countp)
1090 atomic_add(rc, (atomic_t *)req->rq_set->set_countp);
1092 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1093 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1095 osc_update_grant(cli, body);
1097 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1099 CERROR ("Unexpected +ve rc %d\n", rc);
1102 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1104 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1106 check_write_checksum(&body->oa, peer, client_cksum,
1108 aa->aa_requested_nob,
1113 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1114 aa->aa_page_count, aa->aa_ppga);
1118 /* The rest of this function executes only for OST_READs */
1119 if (rc > aa->aa_requested_nob) {
1120 CERROR("Unexpected rc %d (%d requested)\n", rc,
1121 aa->aa_requested_nob);
1125 if (rc != req->rq_bulk->bd_nob_transferred) {
1126 CERROR ("Unexpected rc %d (%d transferred)\n",
1127 rc, req->rq_bulk->bd_nob_transferred);
1131 if (rc < aa->aa_requested_nob)
1132 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1134 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1135 static int cksum_counter;
1136 __u32 server_cksum = body->oa.o_cksum;
1140 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1143 if (peer->nid == req->rq_bulk->bd_sender) {
1147 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1150 if (server_cksum == ~0 && rc > 0) {
1151 CERROR("Protocol error: server %s set the 'checksum' "
1152 "bit, but didn't send a checksum. Not fatal, "
1153 "but please tell CFS.\n",
1154 libcfs_nid2str(peer->nid));
1155 } else if (server_cksum != client_cksum) {
1156 LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s%s%s inum "
1157 LPU64"/"LPU64" object "LPU64"/"LPU64
1158 " extent ["LPU64"-"LPU64"]\n",
1159 req->rq_import->imp_obd->obd_name,
1160 libcfs_nid2str(peer->nid),
1162 body->oa.o_valid & OBD_MD_FLFID ?
1163 body->oa.o_fid : (__u64)0,
1164 body->oa.o_valid & OBD_MD_FLFID ?
1165 body->oa.o_generation :(__u64)0,
1167 body->oa.o_valid & OBD_MD_FLGROUP ?
1168 body->oa.o_gr : (__u64)0,
1169 aa->aa_ppga[0]->off,
1170 aa->aa_ppga[aa->aa_page_count-1]->off +
1171 aa->aa_ppga[aa->aa_page_count-1]->count -
1173 CERROR("client %x, server %x\n",
1174 client_cksum, server_cksum);
1176 aa->aa_oa->o_cksum = client_cksum;
1180 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1183 } else if (unlikely(client_cksum)) {
1184 static int cksum_missed;
1187 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1188 CERROR("Checksum %u requested from %s but not sent\n",
1189 cksum_missed, libcfs_nid2str(peer->nid));
1195 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1200 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1201 struct lov_stripe_md *lsm,
1202 obd_count page_count, struct brw_page **pga)
1204 struct ptlrpc_request *request;
1205 int rc, retries = 5; /* lprocfs? */
1209 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1210 page_count, pga, &request);
1214 rc = ptlrpc_queue_wait(request);
1216 if (rc == -ETIMEDOUT && request->rq_resend) {
1217 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1218 ptlrpc_req_finished(request);
1222 rc = osc_brw_fini_request(request, rc);
1224 ptlrpc_req_finished(request);
1225 if (rc == -EAGAIN) {
1233 int osc_brw_redo_request(struct ptlrpc_request *request,
1234 struct osc_brw_async_args *aa)
1236 struct ptlrpc_request *new_req;
1237 struct ptlrpc_request_set *set = request->rq_set;
1238 struct osc_brw_async_args *new_aa;
1239 struct osc_async_page *oap;
1243 if (aa->aa_retries-- <= 0) {
1244 CERROR("too many checksum retries, returning error\n");
1248 DEBUG_REQ(D_ERROR, request, "redo for checksum error");
1249 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1250 if (oap->oap_request != NULL) {
1251 LASSERTF(request == oap->oap_request,
1252 "request %p != oap_request %p\n",
1253 request, oap->oap_request);
1254 if (oap->oap_interrupted) {
1255 ptlrpc_mark_interrupted(oap->oap_request);
1264 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1265 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1266 aa->aa_cli, aa->aa_oa,
1267 NULL /* lsm unused by osc currently */,
1268 aa->aa_page_count, aa->aa_ppga, &new_req);
1272 /* New request takes over pga and oaps from old request.
1273 * Note that copying a list_head doesn't work, need to move it... */
1274 new_req->rq_interpret_reply = request->rq_interpret_reply;
1275 new_req->rq_async_args = request->rq_async_args;
1276 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1277 INIT_LIST_HEAD(&new_aa->aa_oaps);
1278 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1279 INIT_LIST_HEAD(&aa->aa_oaps);
1281 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1282 if (oap->oap_request) {
1283 ptlrpc_req_finished(oap->oap_request);
1284 oap->oap_request = ptlrpc_request_addref(new_req);
1288 ptlrpc_set_add_req(set, new_req);
1293 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1295 struct osc_brw_async_args *aa = data;
1299 rc = osc_brw_fini_request(request, rc);
1300 if (rc == -EAGAIN) {
1301 rc = osc_brw_redo_request(request, aa);
1306 spin_lock(&aa->aa_cli->cl_loi_list_lock);
1307 for (i = 0; i < aa->aa_page_count; i++)
1308 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1309 spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1311 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1316 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1317 struct lov_stripe_md *lsm, obd_count page_count,
1318 struct brw_page **pga, struct ptlrpc_request_set *set)
1320 struct ptlrpc_request *request;
1321 struct client_obd *cli = &exp->exp_obd->u.cli;
1325 /* Consume write credits even if doing a sync write -
1326 * otherwise we may run out of space on OST due to grant. */
1327 if (cmd == OBD_BRW_WRITE) {
1328 spin_lock(&cli->cl_loi_list_lock);
1329 for (i = 0; i < page_count; i++) {
1330 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1331 osc_consume_write_grant(cli, pga[i]);
1333 spin_unlock(&cli->cl_loi_list_lock);
1336 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1337 page_count, pga, &request);
1340 request->rq_interpret_reply = brw_interpret;
1341 ptlrpc_set_add_req(set, request);
1342 } else if (cmd == OBD_BRW_WRITE) {
1343 spin_lock(&cli->cl_loi_list_lock);
1344 for (i = 0; i < page_count; i++)
1345 osc_release_write_grant(cli, pga[i], 0);
1346 spin_unlock(&cli->cl_loi_list_lock);
1353 * ugh, we want disk allocation on the target to happen in offset order. we'll
1354 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1355 * fine for our small page arrays and doesn't require allocation. its an
1356 * insertion sort that swaps elements that are strides apart, shrinking the
1357 * stride down until its '1' and the array is sorted.
1359 static void sort_brw_pages(struct brw_page **array, int num)
1362 struct brw_page *tmp;
1366 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1371 for (i = stride ; i < num ; i++) {
1374 while (j >= stride && array[j-stride]->off > tmp->off) {
1375 array[j] = array[j - stride];
1380 } while (stride > 1);
1383 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1389 LASSERT (pages > 0);
1390 offset = pg[i]->off & (~CFS_PAGE_MASK);
1394 if (pages == 0) /* that's all */
1397 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1398 return count; /* doesn't end on page boundary */
1401 offset = pg[i]->off & (~CFS_PAGE_MASK);
1402 if (offset != 0) /* doesn't start on page boundary */
1409 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1411 struct brw_page **ppga;
1414 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1418 for (i = 0; i < count; i++)
1423 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1425 LASSERT(ppga != NULL);
1426 OBD_FREE(ppga, sizeof(*ppga) * count);
1429 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1430 obd_count page_count, struct brw_page *pga,
1431 struct obd_trans_info *oti)
1433 struct obdo *saved_oa = NULL;
1434 struct brw_page **ppga, **orig;
1435 struct obd_import *imp = class_exp2cliimp(exp);
1436 struct client_obd *cli = &imp->imp_obd->u.cli;
1437 int rc, page_count_orig;
1440 if (cmd & OBD_BRW_CHECK) {
1441 /* The caller just wants to know if there's a chance that this
1442 * I/O can succeed */
1444 if (imp == NULL || imp->imp_invalid)
1449 /* test_brw with a failed create can trip this, maybe others. */
1450 LASSERT(cli->cl_max_pages_per_rpc);
1454 orig = ppga = osc_build_ppga(pga, page_count);
1457 page_count_orig = page_count;
1459 sort_brw_pages(ppga, page_count);
1460 while (page_count) {
1461 obd_count pages_per_brw;
1463 if (page_count > cli->cl_max_pages_per_rpc)
1464 pages_per_brw = cli->cl_max_pages_per_rpc;
1466 pages_per_brw = page_count;
1468 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1470 if (saved_oa != NULL) {
1471 /* restore previously saved oa */
1472 *oinfo->oi_oa = *saved_oa;
1473 } else if (page_count > pages_per_brw) {
1474 /* save a copy of oa (brw will clobber it) */
1475 saved_oa = obdo_alloc();
1476 if (saved_oa == NULL)
1477 GOTO(out, rc = -ENOMEM);
1478 *saved_oa = *oinfo->oi_oa;
1481 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1482 pages_per_brw, ppga);
1487 page_count -= pages_per_brw;
1488 ppga += pages_per_brw;
1492 osc_release_ppga(orig, page_count_orig);
1494 if (saved_oa != NULL)
1495 obdo_free(saved_oa);
1500 static int osc_brw_async(int cmd, struct obd_export *exp,
1501 struct obd_info *oinfo, obd_count page_count,
1502 struct brw_page *pga, struct obd_trans_info *oti,
1503 struct ptlrpc_request_set *set)
1505 struct brw_page **ppga, **orig;
1506 int page_count_orig;
1510 if (cmd & OBD_BRW_CHECK) {
1511 /* The caller just wants to know if there's a chance that this
1512 * I/O can succeed */
1513 struct obd_import *imp = class_exp2cliimp(exp);
1515 if (imp == NULL || imp->imp_invalid)
1520 orig = ppga = osc_build_ppga(pga, page_count);
1523 page_count_orig = page_count;
1525 sort_brw_pages(ppga, page_count);
1526 while (page_count) {
1527 struct brw_page **copy;
1528 obd_count pages_per_brw;
1530 pages_per_brw = min_t(obd_count, page_count,
1531 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1533 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1535 /* use ppga only if single RPC is going to fly */
1536 if (pages_per_brw != page_count_orig || ppga != orig) {
1537 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1539 GOTO(out, rc = -ENOMEM);
1540 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1544 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1545 pages_per_brw, copy, set);
1549 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1554 /* we passed it to async_internal() which is
1555 * now responsible for releasing memory */
1559 page_count -= pages_per_brw;
1560 ppga += pages_per_brw;
1564 osc_release_ppga(orig, page_count_orig);
1568 static void osc_check_rpcs(struct client_obd *cli);
1570 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1571 * the dirty accounting. Writeback completes or truncate happens before
1572 * writing starts. Must be called with the loi lock held. */
1573 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1576 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1579 /* This maintains the lists of pending pages to read/write for a given object
1580 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1581 * to quickly find objects that are ready to send an RPC. */
1582 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1588 if (lop->lop_num_pending == 0)
1591 /* if we have an invalid import we want to drain the queued pages
1592 * by forcing them through rpcs that immediately fail and complete
1593 * the pages. recovery relies on this to empty the queued pages
1594 * before canceling the locks and evicting down the llite pages */
1595 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1598 /* stream rpcs in queue order as long as as there is an urgent page
1599 * queued. this is our cheap solution for good batching in the case
1600 * where writepage marks some random page in the middle of the file
1601 * as urgent because of, say, memory pressure */
1602 if (!list_empty(&lop->lop_urgent)) {
1603 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1607 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1608 optimal = cli->cl_max_pages_per_rpc;
1609 if (cmd & OBD_BRW_WRITE) {
1610 /* trigger a write rpc stream as long as there are dirtiers
1611 * waiting for space. as they're waiting, they're not going to
1612 * create more pages to coallesce with what's waiting.. */
1613 if (!list_empty(&cli->cl_cache_waiters)) {
1614 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1618 /* +16 to avoid triggering rpcs that would want to include pages
1619 * that are being queued but which can't be made ready until
1620 * the queuer finishes with the page. this is a wart for
1621 * llite::commit_write() */
1624 if (lop->lop_num_pending >= optimal)
1630 static void on_list(struct list_head *item, struct list_head *list,
1633 if (list_empty(item) && should_be_on)
1634 list_add_tail(item, list);
1635 else if (!list_empty(item) && !should_be_on)
1636 list_del_init(item);
1639 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1640 * can find pages to build into rpcs quickly */
1641 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1643 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1644 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1645 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1647 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1648 loi->loi_write_lop.lop_num_pending);
1650 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1651 loi->loi_read_lop.lop_num_pending);
1654 static void lop_update_pending(struct client_obd *cli,
1655 struct loi_oap_pages *lop, int cmd, int delta)
1657 lop->lop_num_pending += delta;
1658 if (cmd & OBD_BRW_WRITE)
1659 cli->cl_pending_w_pages += delta;
1661 cli->cl_pending_r_pages += delta;
1664 /* this is called when a sync waiter receives an interruption. Its job is to
1665 * get the caller woken as soon as possible. If its page hasn't been put in an
1666 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1667 * desiring interruption which will forcefully complete the rpc once the rpc
1669 static void osc_occ_interrupted(struct oig_callback_context *occ)
1671 struct osc_async_page *oap;
1672 struct loi_oap_pages *lop;
1673 struct lov_oinfo *loi;
1676 /* XXX member_of() */
1677 oap = list_entry(occ, struct osc_async_page, oap_occ);
1679 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1681 oap->oap_interrupted = 1;
1683 /* ok, it's been put in an rpc. only one oap gets a request reference */
1684 if (oap->oap_request != NULL) {
1685 ptlrpc_mark_interrupted(oap->oap_request);
1686 ptlrpcd_wake(oap->oap_request);
1690 /* we don't get interruption callbacks until osc_trigger_group_io()
1691 * has been called and put the sync oaps in the pending/urgent lists.*/
1692 if (!list_empty(&oap->oap_pending_item)) {
1693 list_del_init(&oap->oap_pending_item);
1694 list_del_init(&oap->oap_urgent_item);
1697 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1698 &loi->loi_write_lop : &loi->loi_read_lop;
1699 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1700 loi_list_maint(oap->oap_cli, oap->oap_loi);
1702 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1703 oap->oap_oig = NULL;
1707 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1710 /* this is trying to propogate async writeback errors back up to the
1711 * application. As an async write fails we record the error code for later if
1712 * the app does an fsync. As long as errors persist we force future rpcs to be
1713 * sync so that the app can get a sync error and break the cycle of queueing
1714 * pages for which writeback will fail. */
1715 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1722 ar->ar_force_sync = 1;
1723 ar->ar_min_xid = ptlrpc_sample_next_xid();
1728 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1729 ar->ar_force_sync = 0;
1732 static void osc_oap_to_pending(struct osc_async_page *oap)
1734 struct loi_oap_pages *lop;
1736 if (oap->oap_cmd & OBD_BRW_WRITE)
1737 lop = &oap->oap_loi->loi_write_lop;
1739 lop = &oap->oap_loi->loi_read_lop;
1741 if (oap->oap_async_flags & ASYNC_URGENT)
1742 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1743 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1744 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1747 /* this must be called holding the loi list lock to give coverage to exit_cache,
1748 * async_flag maintenance, and oap_request */
1749 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1750 struct osc_async_page *oap, int sent, int rc)
1753 oap->oap_async_flags = 0;
1754 oap->oap_interrupted = 0;
1756 if (oap->oap_cmd & OBD_BRW_WRITE) {
1757 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1758 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1761 if (oap->oap_request != NULL) {
1762 ptlrpc_req_finished(oap->oap_request);
1763 oap->oap_request = NULL;
1766 if (rc == 0 && oa != NULL) {
1767 if (oa->o_valid & OBD_MD_FLBLOCKS)
1768 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1769 if (oa->o_valid & OBD_MD_FLMTIME)
1770 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1771 if (oa->o_valid & OBD_MD_FLATIME)
1772 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1773 if (oa->o_valid & OBD_MD_FLCTIME)
1774 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1778 osc_exit_cache(cli, oap, sent);
1779 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1780 oap->oap_oig = NULL;
1785 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1786 oap->oap_cmd, oa, rc);
1788 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1789 * I/O on the page could start, but OSC calls it under lock
1790 * and thus we can add oap back to pending safely */
1792 /* upper layer wants to leave the page on pending queue */
1793 osc_oap_to_pending(oap);
1795 osc_exit_cache(cli, oap, sent);
1799 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1801 struct osc_brw_async_args *aa = data;
1802 struct osc_async_page *oap, *tmp;
1803 struct client_obd *cli;
1806 rc = osc_brw_fini_request(request, rc);
1807 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1808 if (rc == -EAGAIN) {
1809 rc = osc_brw_redo_request(request, aa);
1817 client_obd_list_lock(&cli->cl_loi_list_lock);
1819 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1820 * is called so we know whether to go to sync BRWs or wait for more
1821 * RPCs to complete */
1822 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1823 cli->cl_w_in_flight--;
1825 cli->cl_r_in_flight--;
1827 /* the caller may re-use the oap after the completion call so
1828 * we need to clean it up a little */
1829 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1830 list_del_init(&oap->oap_rpc_item);
1831 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1834 osc_wake_cache_waiters(cli);
1835 osc_check_rpcs(cli);
1837 client_obd_list_unlock(&cli->cl_loi_list_lock);
1839 obdo_free(aa->aa_oa);
1843 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1847 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1848 struct list_head *rpc_list,
1849 int page_count, int cmd)
1851 struct ptlrpc_request *req;
1852 struct brw_page **pga = NULL;
1853 struct osc_brw_async_args *aa;
1854 struct obdo *oa = NULL;
1855 struct obd_async_page_ops *ops = NULL;
1856 void *caller_data = NULL;
1857 struct osc_async_page *oap;
1861 LASSERT(!list_empty(rpc_list));
1863 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1865 RETURN(ERR_PTR(-ENOMEM));
1869 GOTO(out, req = ERR_PTR(-ENOMEM));
1872 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1874 ops = oap->oap_caller_ops;
1875 caller_data = oap->oap_caller_data;
1877 pga[i] = &oap->oap_brw_page;
1878 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1879 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1880 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1884 /* always get the data for the obdo for the rpc */
1885 LASSERT(ops != NULL);
1886 ops->ap_fill_obdo(caller_data, cmd, oa);
1888 sort_brw_pages(pga, page_count);
1889 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1891 CERROR("prep_req failed: %d\n", rc);
1892 GOTO(out, req = ERR_PTR(rc));
1895 /* Need to update the timestamps after the request is built in case
1896 * we race with setattr (locally or in queue at OST). If OST gets
1897 * later setattr before earlier BRW (as determined by the request xid),
1898 * the OST will not use BRW timestamps. Sadly, there is no obvious
1899 * way to do this in a single call. bug 10150 */
1900 ops->ap_update_obdo(caller_data, cmd, oa,
1901 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1903 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1904 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1905 INIT_LIST_HEAD(&aa->aa_oaps);
1906 list_splice(rpc_list, &aa->aa_oaps);
1907 INIT_LIST_HEAD(rpc_list);
1914 OBD_FREE(pga, sizeof(*pga) * page_count);
1919 /* the loi lock is held across this function but it's allowed to release
1920 * and reacquire it during its work */
1921 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1922 int cmd, struct loi_oap_pages *lop)
1924 struct ptlrpc_request *req;
1925 obd_count page_count = 0;
1926 struct osc_async_page *oap = NULL, *tmp;
1927 struct osc_brw_async_args *aa;
1928 struct obd_async_page_ops *ops;
1929 CFS_LIST_HEAD(rpc_list);
1930 unsigned int ending_offset;
1931 unsigned starting_offset = 0;
1934 /* first we find the pages we're allowed to work with */
1935 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1936 ops = oap->oap_caller_ops;
1938 LASSERT(oap->oap_magic == OAP_MAGIC);
1940 /* in llite being 'ready' equates to the page being locked
1941 * until completion unlocks it. commit_write submits a page
1942 * as not ready because its unlock will happen unconditionally
1943 * as the call returns. if we race with commit_write giving
1944 * us that page we dont' want to create a hole in the page
1945 * stream, so we stop and leave the rpc to be fired by
1946 * another dirtier or kupdated interval (the not ready page
1947 * will still be on the dirty list). we could call in
1948 * at the end of ll_file_write to process the queue again. */
1949 if (!(oap->oap_async_flags & ASYNC_READY)) {
1950 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1952 CDEBUG(D_INODE, "oap %p page %p returned %d "
1953 "instead of ready\n", oap,
1957 /* llite is telling us that the page is still
1958 * in commit_write and that we should try
1959 * and put it in an rpc again later. we
1960 * break out of the loop so we don't create
1961 * a hole in the sequence of pages in the rpc
1966 /* the io isn't needed.. tell the checks
1967 * below to complete the rpc with EINTR */
1968 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1969 oap->oap_count = -EINTR;
1972 oap->oap_async_flags |= ASYNC_READY;
1975 LASSERTF(0, "oap %p page %p returned %d "
1976 "from make_ready\n", oap,
1984 * Page submitted for IO has to be locked. Either by
1985 * ->ap_make_ready() or by higher layers.
1987 * XXX nikita: this assertion should be adjusted when lustre
1988 * starts using PG_writeback for pages being written out.
1990 #if defined(__KERNEL__) && defined(__LINUX__)
1991 LASSERT(PageLocked(oap->oap_page));
1993 /* If there is a gap at the start of this page, it can't merge
1994 * with any previous page, so we'll hand the network a
1995 * "fragmented" page array that it can't transfer in 1 RDMA */
1996 if (page_count != 0 && oap->oap_page_off != 0)
1999 /* take the page out of our book-keeping */
2000 list_del_init(&oap->oap_pending_item);
2001 lop_update_pending(cli, lop, cmd, -1);
2002 list_del_init(&oap->oap_urgent_item);
2004 if (page_count == 0)
2005 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2006 (PTLRPC_MAX_BRW_SIZE - 1);
2008 /* ask the caller for the size of the io as the rpc leaves. */
2009 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2011 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2012 if (oap->oap_count <= 0) {
2013 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2015 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2019 /* now put the page back in our accounting */
2020 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2021 if (++page_count >= cli->cl_max_pages_per_rpc)
2024 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2025 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2026 * have the same alignment as the initial writes that allocated
2027 * extents on the server. */
2028 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2029 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2030 if (ending_offset == 0)
2033 /* If there is a gap at the end of this page, it can't merge
2034 * with any subsequent pages, so we'll hand the network a
2035 * "fragmented" page array that it can't transfer in 1 RDMA */
2036 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2040 osc_wake_cache_waiters(cli);
2042 if (page_count == 0)
2045 loi_list_maint(cli, loi);
2047 client_obd_list_unlock(&cli->cl_loi_list_lock);
2049 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2051 /* this should happen rarely and is pretty bad, it makes the
2052 * pending list not follow the dirty order */
2053 client_obd_list_lock(&cli->cl_loi_list_lock);
2054 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2055 list_del_init(&oap->oap_rpc_item);
2057 /* queued sync pages can be torn down while the pages
2058 * were between the pending list and the rpc */
2059 if (oap->oap_interrupted) {
2060 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2061 osc_ap_completion(cli, NULL, oap, 0,
2065 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2067 loi_list_maint(cli, loi);
2068 RETURN(PTR_ERR(req));
2071 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2072 if (cmd == OBD_BRW_READ) {
2073 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2074 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2075 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2076 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2077 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2079 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2080 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2081 cli->cl_w_in_flight);
2082 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2083 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2084 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2087 client_obd_list_lock(&cli->cl_loi_list_lock);
2089 if (cmd == OBD_BRW_READ)
2090 cli->cl_r_in_flight++;
2092 cli->cl_w_in_flight++;
2094 /* queued sync pages can be torn down while the pages
2095 * were between the pending list and the rpc */
2097 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2098 /* only one oap gets a request reference */
2101 if (oap->oap_interrupted && !req->rq_intr) {
2102 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2104 ptlrpc_mark_interrupted(req);
2108 tmp->oap_request = ptlrpc_request_addref(req);
2110 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2111 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2113 req->rq_interpret_reply = brw_interpret_oap;
2114 ptlrpcd_add_req(req);
2118 #define LOI_DEBUG(LOI, STR, args...) \
2119 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2120 !list_empty(&(LOI)->loi_cli_item), \
2121 (LOI)->loi_write_lop.lop_num_pending, \
2122 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2123 (LOI)->loi_read_lop.lop_num_pending, \
2124 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2127 /* This is called by osc_check_rpcs() to find which objects have pages that
2128 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2129 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2132 /* first return all objects which we already know to have
2133 * pages ready to be stuffed into rpcs */
2134 if (!list_empty(&cli->cl_loi_ready_list))
2135 RETURN(list_entry(cli->cl_loi_ready_list.next,
2136 struct lov_oinfo, loi_cli_item));
2138 /* then if we have cache waiters, return all objects with queued
2139 * writes. This is especially important when many small files
2140 * have filled up the cache and not been fired into rpcs because
2141 * they don't pass the nr_pending/object threshhold */
2142 if (!list_empty(&cli->cl_cache_waiters) &&
2143 !list_empty(&cli->cl_loi_write_list))
2144 RETURN(list_entry(cli->cl_loi_write_list.next,
2145 struct lov_oinfo, loi_write_item));
2147 /* then return all queued objects when we have an invalid import
2148 * so that they get flushed */
2149 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2150 if (!list_empty(&cli->cl_loi_write_list))
2151 RETURN(list_entry(cli->cl_loi_write_list.next,
2152 struct lov_oinfo, loi_write_item));
2153 if (!list_empty(&cli->cl_loi_read_list))
2154 RETURN(list_entry(cli->cl_loi_read_list.next,
2155 struct lov_oinfo, loi_read_item));
2160 /* called with the loi list lock held */
2161 static void osc_check_rpcs(struct client_obd *cli)
2163 struct lov_oinfo *loi;
2164 int rc = 0, race_counter = 0;
2167 while ((loi = osc_next_loi(cli)) != NULL) {
2168 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2170 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2173 /* attempt some read/write balancing by alternating between
2174 * reads and writes in an object. The makes_rpc checks here
2175 * would be redundant if we were getting read/write work items
2176 * instead of objects. we don't want send_oap_rpc to drain a
2177 * partial read pending queue when we're given this object to
2178 * do io on writes while there are cache waiters */
2179 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2180 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2181 &loi->loi_write_lop);
2189 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2190 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2191 &loi->loi_read_lop);
2200 /* attempt some inter-object balancing by issueing rpcs
2201 * for each object in turn */
2202 if (!list_empty(&loi->loi_cli_item))
2203 list_del_init(&loi->loi_cli_item);
2204 if (!list_empty(&loi->loi_write_item))
2205 list_del_init(&loi->loi_write_item);
2206 if (!list_empty(&loi->loi_read_item))
2207 list_del_init(&loi->loi_read_item);
2209 loi_list_maint(cli, loi);
2211 /* send_oap_rpc fails with 0 when make_ready tells it to
2212 * back off. llite's make_ready does this when it tries
2213 * to lock a page queued for write that is already locked.
2214 * we want to try sending rpcs from many objects, but we
2215 * don't want to spin failing with 0. */
2216 if (race_counter == 10)
2222 /* we're trying to queue a page in the osc so we're subject to the
2223 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2224 * If the osc's queued pages are already at that limit, then we want to sleep
2225 * until there is space in the osc's queue for us. We also may be waiting for
2226 * write credits from the OST if there are RPCs in flight that may return some
2227 * before we fall back to sync writes.
2229 * We need this know our allocation was granted in the presence of signals */
2230 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2234 client_obd_list_lock(&cli->cl_loi_list_lock);
2235 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2236 client_obd_list_unlock(&cli->cl_loi_list_lock);
2240 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2241 * grant or cache space. */
2242 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2243 struct osc_async_page *oap)
2245 struct osc_cache_waiter ocw;
2246 struct l_wait_info lwi = { 0 };
2249 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2250 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2251 cli->cl_dirty_max, obd_max_dirty_pages,
2252 cli->cl_lost_grant, cli->cl_avail_grant);
2254 /* force the caller to try sync io. this can jump the list
2255 * of queued writes and create a discontiguous rpc stream */
2256 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2257 loi->loi_ar.ar_force_sync)
2260 /* Hopefully normal case - cache space and write credits available */
2261 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2262 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2263 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2264 /* account for ourselves */
2265 osc_consume_write_grant(cli, &oap->oap_brw_page);
2269 /* Make sure that there are write rpcs in flight to wait for. This
2270 * is a little silly as this object may not have any pending but
2271 * other objects sure might. */
2272 if (cli->cl_w_in_flight) {
2273 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2274 cfs_waitq_init(&ocw.ocw_waitq);
2278 loi_list_maint(cli, loi);
2279 osc_check_rpcs(cli);
2280 client_obd_list_unlock(&cli->cl_loi_list_lock);
2282 CDEBUG(D_CACHE, "sleeping for cache space\n");
2283 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2285 client_obd_list_lock(&cli->cl_loi_list_lock);
2286 if (!list_empty(&ocw.ocw_entry)) {
2287 list_del(&ocw.ocw_entry);
2296 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2297 struct lov_oinfo *loi, cfs_page_t *page,
2298 obd_off offset, struct obd_async_page_ops *ops,
2299 void *data, void **res)
2301 struct osc_async_page *oap;
2305 return size_round(sizeof(*oap));
2308 oap->oap_magic = OAP_MAGIC;
2309 oap->oap_cli = &exp->exp_obd->u.cli;
2312 oap->oap_caller_ops = ops;
2313 oap->oap_caller_data = data;
2315 oap->oap_page = page;
2316 oap->oap_obj_off = offset;
2318 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2319 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2320 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2322 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2324 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2328 struct osc_async_page *oap_from_cookie(void *cookie)
2330 struct osc_async_page *oap = cookie;
2331 if (oap->oap_magic != OAP_MAGIC)
2332 return ERR_PTR(-EINVAL);
2336 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2337 struct lov_oinfo *loi, void *cookie,
2338 int cmd, obd_off off, int count,
2339 obd_flag brw_flags, enum async_flags async_flags)
2341 struct client_obd *cli = &exp->exp_obd->u.cli;
2342 struct osc_async_page *oap;
2346 oap = oap_from_cookie(cookie);
2348 RETURN(PTR_ERR(oap));
2350 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2353 if (!list_empty(&oap->oap_pending_item) ||
2354 !list_empty(&oap->oap_urgent_item) ||
2355 !list_empty(&oap->oap_rpc_item))
2358 /* check if the file's owner/group is over quota */
2359 #ifdef HAVE_QUOTA_SUPPORT
2360 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2361 struct obd_async_page_ops *ops;
2368 ops = oap->oap_caller_ops;
2369 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2370 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2381 loi = lsm->lsm_oinfo[0];
2383 client_obd_list_lock(&cli->cl_loi_list_lock);
2386 oap->oap_page_off = off;
2387 oap->oap_count = count;
2388 oap->oap_brw_flags = brw_flags;
2389 oap->oap_async_flags = async_flags;
2391 if (cmd & OBD_BRW_WRITE) {
2392 rc = osc_enter_cache(cli, loi, oap);
2394 client_obd_list_unlock(&cli->cl_loi_list_lock);
2399 osc_oap_to_pending(oap);
2400 loi_list_maint(cli, loi);
2402 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2405 osc_check_rpcs(cli);
2406 client_obd_list_unlock(&cli->cl_loi_list_lock);
2411 /* aka (~was & now & flag), but this is more clear :) */
2412 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2414 static int osc_set_async_flags(struct obd_export *exp,
2415 struct lov_stripe_md *lsm,
2416 struct lov_oinfo *loi, void *cookie,
2417 obd_flag async_flags)
2419 struct client_obd *cli = &exp->exp_obd->u.cli;
2420 struct loi_oap_pages *lop;
2421 struct osc_async_page *oap;
2425 oap = oap_from_cookie(cookie);
2427 RETURN(PTR_ERR(oap));
2430 * bug 7311: OST-side locking is only supported for liblustre for now
2431 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2432 * implementation has to handle case where OST-locked page was picked
2433 * up by, e.g., ->writepage().
2435 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2436 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2439 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2443 loi = lsm->lsm_oinfo[0];
2445 if (oap->oap_cmd & OBD_BRW_WRITE) {
2446 lop = &loi->loi_write_lop;
2448 lop = &loi->loi_read_lop;
2451 client_obd_list_lock(&cli->cl_loi_list_lock);
2453 if (list_empty(&oap->oap_pending_item))
2454 GOTO(out, rc = -EINVAL);
2456 if ((oap->oap_async_flags & async_flags) == async_flags)
2459 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2460 oap->oap_async_flags |= ASYNC_READY;
2462 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2463 if (list_empty(&oap->oap_rpc_item)) {
2464 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2465 loi_list_maint(cli, loi);
2469 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2470 oap->oap_async_flags);
2472 osc_check_rpcs(cli);
2473 client_obd_list_unlock(&cli->cl_loi_list_lock);
2477 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2478 struct lov_oinfo *loi,
2479 struct obd_io_group *oig, void *cookie,
2480 int cmd, obd_off off, int count,
2482 obd_flag async_flags)
2484 struct client_obd *cli = &exp->exp_obd->u.cli;
2485 struct osc_async_page *oap;
2486 struct loi_oap_pages *lop;
2490 oap = oap_from_cookie(cookie);
2492 RETURN(PTR_ERR(oap));
2494 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2497 if (!list_empty(&oap->oap_pending_item) ||
2498 !list_empty(&oap->oap_urgent_item) ||
2499 !list_empty(&oap->oap_rpc_item))
2503 loi = lsm->lsm_oinfo[0];
2505 client_obd_list_lock(&cli->cl_loi_list_lock);
2508 oap->oap_page_off = off;
2509 oap->oap_count = count;
2510 oap->oap_brw_flags = brw_flags;
2511 oap->oap_async_flags = async_flags;
2513 if (cmd & OBD_BRW_WRITE)
2514 lop = &loi->loi_write_lop;
2516 lop = &loi->loi_read_lop;
2518 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2519 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2521 rc = oig_add_one(oig, &oap->oap_occ);
2524 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2525 oap, oap->oap_page, rc);
2527 client_obd_list_unlock(&cli->cl_loi_list_lock);
2532 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2533 struct loi_oap_pages *lop, int cmd)
2535 struct list_head *pos, *tmp;
2536 struct osc_async_page *oap;
2538 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2539 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2540 list_del(&oap->oap_pending_item);
2541 osc_oap_to_pending(oap);
2543 loi_list_maint(cli, loi);
2546 static int osc_trigger_group_io(struct obd_export *exp,
2547 struct lov_stripe_md *lsm,
2548 struct lov_oinfo *loi,
2549 struct obd_io_group *oig)
2551 struct client_obd *cli = &exp->exp_obd->u.cli;
2555 loi = lsm->lsm_oinfo[0];
2557 client_obd_list_lock(&cli->cl_loi_list_lock);
2559 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2560 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2562 osc_check_rpcs(cli);
2563 client_obd_list_unlock(&cli->cl_loi_list_lock);
2568 static int osc_teardown_async_page(struct obd_export *exp,
2569 struct lov_stripe_md *lsm,
2570 struct lov_oinfo *loi, void *cookie)
2572 struct client_obd *cli = &exp->exp_obd->u.cli;
2573 struct loi_oap_pages *lop;
2574 struct osc_async_page *oap;
2578 oap = oap_from_cookie(cookie);
2580 RETURN(PTR_ERR(oap));
2583 loi = lsm->lsm_oinfo[0];
2585 if (oap->oap_cmd & OBD_BRW_WRITE) {
2586 lop = &loi->loi_write_lop;
2588 lop = &loi->loi_read_lop;
2591 client_obd_list_lock(&cli->cl_loi_list_lock);
2593 if (!list_empty(&oap->oap_rpc_item))
2594 GOTO(out, rc = -EBUSY);
2596 osc_exit_cache(cli, oap, 0);
2597 osc_wake_cache_waiters(cli);
2599 if (!list_empty(&oap->oap_urgent_item)) {
2600 list_del_init(&oap->oap_urgent_item);
2601 oap->oap_async_flags &= ~ASYNC_URGENT;
2603 if (!list_empty(&oap->oap_pending_item)) {
2604 list_del_init(&oap->oap_pending_item);
2605 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2607 loi_list_maint(cli, loi);
2609 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2611 client_obd_list_unlock(&cli->cl_loi_list_lock);
2615 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2618 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2621 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2624 lock_res_and_lock(lock);
2627 /* Liang XXX: Darwin and Winnt checking should be added */
2628 if (lock->l_ast_data && lock->l_ast_data != data) {
2629 struct inode *new_inode = data;
2630 struct inode *old_inode = lock->l_ast_data;
2631 if (!(old_inode->i_state & I_FREEING))
2632 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2633 LASSERTF(old_inode->i_state & I_FREEING,
2634 "Found existing inode %p/%lu/%u state %lu in lock: "
2635 "setting data to %p/%lu/%u\n", old_inode,
2636 old_inode->i_ino, old_inode->i_generation,
2638 new_inode, new_inode->i_ino, new_inode->i_generation);
2642 lock->l_ast_data = data;
2643 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2644 unlock_res_and_lock(lock);
2645 LDLM_LOCK_PUT(lock);
2648 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2649 ldlm_iterator_t replace, void *data)
2651 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2652 struct obd_device *obd = class_exp2obd(exp);
2654 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2658 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2664 /* The request was created before ldlm_cli_enqueue call. */
2665 if (rc == ELDLM_LOCK_ABORTED) {
2666 struct ldlm_reply *rep;
2668 /* swabbed by ldlm_cli_enqueue() */
2669 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2670 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2672 LASSERT(rep != NULL);
2673 if (rep->lock_policy_res1)
2674 rc = rep->lock_policy_res1;
2678 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2679 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2680 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2681 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2682 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2685 /* Call the update callback. */
2686 rc = oinfo->oi_cb_up(oinfo, rc);
2690 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2691 struct osc_enqueue_args *aa, int rc)
2693 int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2694 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2695 struct ldlm_lock *lock;
2697 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2699 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2701 /* Complete obtaining the lock procedure. */
2702 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2704 &aa->oa_ei->ei_flags,
2705 &lsm->lsm_oinfo[0]->loi_lvb,
2706 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2707 lustre_swab_ost_lvb,
2708 aa->oa_oi->oi_lockh, rc);
2710 /* Complete osc stuff. */
2711 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2713 /* Release the lock for async request. */
2714 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2715 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2717 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2718 aa->oa_oi->oi_lockh, req, aa);
2719 LDLM_LOCK_PUT(lock);
2723 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2724 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2725 * other synchronous requests, however keeping some locks and trying to obtain
2726 * others may take a considerable amount of time in a case of ost failure; and
2727 * when other sync requests do not get released lock from a client, the client
2728 * is excluded from the cluster -- such scenarious make the life difficult, so
2729 * release locks just after they are obtained. */
2730 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2731 struct obd_enqueue_info *einfo)
2733 struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2734 struct obd_device *obd = exp->exp_obd;
2735 struct ldlm_reply *rep;
2736 struct ptlrpc_request *req = NULL;
2737 int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2741 /* Filesystem lock extents are extended to page boundaries so that
2742 * dealing with the page cache is a little smoother. */
2743 oinfo->oi_policy.l_extent.start -=
2744 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2745 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2747 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2750 /* Next, search for already existing extent locks that will cover us */
2751 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2752 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2755 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2758 /* I would like to be able to ASSERT here that rss <=
2759 * kms, but I can't, for reasons which are explained in
2763 /* We already have a lock, and it's referenced */
2764 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2766 /* For async requests, decref the lock. */
2767 if (einfo->ei_rqset)
2768 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2773 /* If we're trying to read, we also search for an existing PW lock. The
2774 * VFS and page cache already protect us locally, so lots of readers/
2775 * writers can share a single PW lock.
2777 * There are problems with conversion deadlocks, so instead of
2778 * converting a read lock to a write lock, we'll just enqueue a new
2781 * At some point we should cancel the read lock instead of making them
2782 * send us a blocking callback, but there are problems with canceling
2783 * locks out from other users right now, too. */
2785 if (einfo->ei_mode == LCK_PR) {
2786 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY,
2787 &res_id, einfo->ei_type, &oinfo->oi_policy,
2788 LCK_PW, oinfo->oi_lockh);
2790 /* FIXME: This is not incredibly elegant, but it might
2791 * be more elegant than adding another parameter to
2792 * lock_match. I want a second opinion. */
2793 /* addref the lock only if not async requests. */
2794 if (!einfo->ei_rqset)
2795 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2796 osc_set_data_with_check(oinfo->oi_lockh,
2799 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2800 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2808 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2809 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
2811 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2812 LDLM_ENQUEUE, 2, size, NULL);
2816 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2817 size[DLM_REPLY_REC_OFF] =
2818 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2819 ptlrpc_req_set_repsize(req, 3, size);
2822 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2823 einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2825 rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2826 &oinfo->oi_policy, einfo->ei_mode,
2827 &einfo->ei_flags, einfo->ei_cb_bl,
2828 einfo->ei_cb_cp, einfo->ei_cb_gl,
2830 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2831 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2832 lustre_swab_ost_lvb, oinfo->oi_lockh,
2833 einfo->ei_rqset ? 1 : 0);
2834 if (einfo->ei_rqset) {
2836 struct osc_enqueue_args *aa;
2837 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2838 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2843 req->rq_interpret_reply = osc_enqueue_interpret;
2844 ptlrpc_set_add_req(einfo->ei_rqset, req);
2845 } else if (intent) {
2846 ptlrpc_req_finished(req);
2851 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2853 ptlrpc_req_finished(req);
2858 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2859 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2860 int *flags, void *data, struct lustre_handle *lockh)
2862 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2863 struct obd_device *obd = exp->exp_obd;
2865 int lflags = *flags;
2868 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2870 /* Filesystem lock extents are extended to page boundaries so that
2871 * dealing with the page cache is a little smoother */
2872 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2873 policy->l_extent.end |= ~CFS_PAGE_MASK;
2875 /* Next, search for already existing extent locks that will cover us */
2876 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2877 policy, mode, lockh);
2879 //if (!(*flags & LDLM_FL_TEST_LOCK))
2880 osc_set_data_with_check(lockh, data, lflags);
2883 /* If we're trying to read, we also search for an existing PW lock. The
2884 * VFS and page cache already protect us locally, so lots of readers/
2885 * writers can share a single PW lock. */
2886 if (mode == LCK_PR) {
2887 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2889 policy, LCK_PW, lockh);
2890 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2891 /* FIXME: This is not incredibly elegant, but it might
2892 * be more elegant than adding another parameter to
2893 * lock_match. I want a second opinion. */
2894 osc_set_data_with_check(lockh, data, lflags);
2895 ldlm_lock_addref(lockh, LCK_PR);
2896 ldlm_lock_decref(lockh, LCK_PW);
2902 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2903 __u32 mode, struct lustre_handle *lockh)
2907 if (unlikely(mode == LCK_GROUP))
2908 ldlm_lock_decref_and_cancel(lockh, mode);
2910 ldlm_lock_decref(lockh, mode);
2915 static int osc_cancel_unused(struct obd_export *exp,
2916 struct lov_stripe_md *lsm, int flags, void *opaque)
2918 struct obd_device *obd = class_exp2obd(exp);
2919 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2921 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2925 static int osc_join_lru(struct obd_export *exp,
2926 struct lov_stripe_md *lsm, int join)
2928 struct obd_device *obd = class_exp2obd(exp);
2929 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2931 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2934 static int osc_statfs_interpret(struct ptlrpc_request *req,
2935 struct osc_async_args *aa, int rc)
2937 struct obd_statfs *msfs;
2943 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2944 lustre_swab_obd_statfs);
2946 CERROR("Can't unpack obd_statfs\n");
2947 GOTO(out, rc = -EPROTO);
2950 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2952 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2956 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2957 __u64 max_age, struct ptlrpc_request_set *rqset)
2959 struct ptlrpc_request *req;
2960 struct osc_async_args *aa;
2961 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2964 /* We could possibly pass max_age in the request (as an absolute
2965 * timestamp or a "seconds.usec ago") so the target can avoid doing
2966 * extra calls into the filesystem if that isn't necessary (e.g.
2967 * during mount that would help a bit). Having relative timestamps
2968 * is not so great if request processing is slow, while absolute
2969 * timestamps are not ideal because they need time synchronization. */
2970 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2971 OST_STATFS, 1, NULL, NULL);
2975 ptlrpc_req_set_repsize(req, 2, size);
2976 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2978 req->rq_interpret_reply = osc_statfs_interpret;
2979 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2980 aa = (struct osc_async_args *)&req->rq_async_args;
2983 ptlrpc_set_add_req(rqset, req);
2987 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2990 struct obd_statfs *msfs;
2991 struct ptlrpc_request *req;
2992 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2995 /* We could possibly pass max_age in the request (as an absolute
2996 * timestamp or a "seconds.usec ago") so the target can avoid doing
2997 * extra calls into the filesystem if that isn't necessary (e.g.
2998 * during mount that would help a bit). Having relative timestamps
2999 * is not so great if request processing is slow, while absolute
3000 * timestamps are not ideal because they need time synchronization. */
3001 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3002 OST_STATFS, 1, NULL, NULL);
3006 ptlrpc_req_set_repsize(req, 2, size);
3007 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3009 rc = ptlrpc_queue_wait(req);
3013 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3014 lustre_swab_obd_statfs);
3016 CERROR("Can't unpack obd_statfs\n");
3017 GOTO(out, rc = -EPROTO);
3020 memcpy(osfs, msfs, sizeof(*osfs));
3024 ptlrpc_req_finished(req);
3028 /* Retrieve object striping information.
3030 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3031 * the maximum number of OST indices which will fit in the user buffer.
3032 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3034 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3036 struct lov_user_md lum, *lumk;
3037 int rc = 0, lum_size;
3043 if (copy_from_user(&lum, lump, sizeof(lum)))
3046 if (lum.lmm_magic != LOV_USER_MAGIC)
3049 if (lum.lmm_stripe_count > 0) {
3050 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3051 OBD_ALLOC(lumk, lum_size);
3055 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3057 lum_size = sizeof(lum);
3061 lumk->lmm_object_id = lsm->lsm_object_id;
3062 lumk->lmm_stripe_count = 1;
3064 if (copy_to_user(lump, lumk, lum_size))
3068 OBD_FREE(lumk, lum_size);
3074 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3075 void *karg, void *uarg)
3077 struct obd_device *obd = exp->exp_obd;
3078 struct obd_ioctl_data *data = karg;
3082 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3085 if (!try_module_get(THIS_MODULE)) {
3086 CERROR("Can't get module. Is it alive?");
3091 case OBD_IOC_LOV_GET_CONFIG: {
3093 struct lov_desc *desc;
3094 struct obd_uuid uuid;
3098 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3099 GOTO(out, err = -EINVAL);
3101 data = (struct obd_ioctl_data *)buf;
3103 if (sizeof(*desc) > data->ioc_inllen1) {
3104 obd_ioctl_freedata(buf, len);
3105 GOTO(out, err = -EINVAL);
3108 if (data->ioc_inllen2 < sizeof(uuid)) {
3109 obd_ioctl_freedata(buf, len);
3110 GOTO(out, err = -EINVAL);
3113 desc = (struct lov_desc *)data->ioc_inlbuf1;
3114 desc->ld_tgt_count = 1;
3115 desc->ld_active_tgt_count = 1;
3116 desc->ld_default_stripe_count = 1;
3117 desc->ld_default_stripe_size = 0;
3118 desc->ld_default_stripe_offset = 0;
3119 desc->ld_pattern = 0;
3120 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3122 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3124 err = copy_to_user((void *)uarg, buf, len);
3127 obd_ioctl_freedata(buf, len);
3130 case LL_IOC_LOV_SETSTRIPE:
3131 err = obd_alloc_memmd(exp, karg);
3135 case LL_IOC_LOV_GETSTRIPE:
3136 err = osc_getstripe(karg, uarg);
3138 case OBD_IOC_CLIENT_RECOVER:
3139 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3144 case IOC_OSC_SET_ACTIVE:
3145 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3148 case OBD_IOC_POLL_QUOTACHECK:
3149 err = lquota_poll_check(quota_interface, exp,
3150 (struct if_quotacheck *)karg);
3153 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3154 cmd, cfs_curproc_comm());
3155 GOTO(out, err = -ENOTTY);
3158 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3161 module_put(THIS_MODULE);
3166 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3167 void *key, __u32 *vallen, void *val)
3170 if (!vallen || !val)
3173 if (keylen > strlen("lock_to_stripe") &&
3174 strcmp(key, "lock_to_stripe") == 0) {
3175 __u32 *stripe = val;
3176 *vallen = sizeof(*stripe);
3179 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3180 struct ptlrpc_request *req;
3182 char *bufs[2] = { NULL, key };
3183 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3185 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3186 OST_GET_INFO, 2, size, bufs);
3190 size[REPLY_REC_OFF] = *vallen;
3191 ptlrpc_req_set_repsize(req, 2, size);
3192 rc = ptlrpc_queue_wait(req);
3196 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3197 lustre_swab_ost_last_id);
3198 if (reply == NULL) {
3199 CERROR("Can't unpack OST last ID\n");
3200 GOTO(out, rc = -EPROTO);
3202 *((obd_id *)val) = *reply;
3204 ptlrpc_req_finished(req);
3210 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3213 struct llog_ctxt *ctxt;
3214 struct obd_import *imp = req->rq_import;
3220 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3223 rc = llog_initiator_connect(ctxt);
3225 CERROR("cannot establish connection for "
3226 "ctxt %p: %d\n", ctxt, rc);
3229 spin_lock(&imp->imp_lock);
3230 imp->imp_server_timeout = 1;
3231 imp->imp_pingable = 1;
3232 spin_unlock(&imp->imp_lock);
3233 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3238 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3239 void *key, obd_count vallen, void *val,
3240 struct ptlrpc_request_set *set)
3242 struct ptlrpc_request *req;
3243 struct obd_device *obd = exp->exp_obd;
3244 struct obd_import *imp = class_exp2cliimp(exp);
3245 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3246 char *bufs[3] = { NULL, key, val };
3249 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3251 if (KEY_IS(KEY_NEXT_ID)) {
3252 if (vallen != sizeof(obd_id))
3254 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3255 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3256 exp->exp_obd->obd_name,
3257 obd->u.cli.cl_oscc.oscc_next_id);
3262 if (KEY_IS("unlinked")) {
3263 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3264 spin_lock(&oscc->oscc_lock);
3265 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3266 spin_unlock(&oscc->oscc_lock);
3270 if (KEY_IS(KEY_INIT_RECOV)) {
3271 if (vallen != sizeof(int))
3273 spin_lock(&imp->imp_lock);
3274 imp->imp_initial_recov = *(int *)val;
3275 spin_unlock(&imp->imp_lock);
3276 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3277 exp->exp_obd->obd_name,
3278 imp->imp_initial_recov);
3282 if (KEY_IS("checksum")) {
3283 if (vallen != sizeof(int))
3285 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3292 /* We pass all other commands directly to OST. Since nobody calls osc
3293 methods directly and everybody is supposed to go through LOV, we
3294 assume lov checked invalid values for us.
3295 The only recognised values so far are evict_by_nid and mds_conn.
3296 Even if something bad goes through, we'd get a -EINVAL from OST
3299 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3304 if (KEY_IS("mds_conn"))
3305 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3307 ptlrpc_req_set_repsize(req, 1, NULL);
3308 ptlrpc_set_add_req(set, req);
3309 ptlrpc_check_set(set);
3315 static struct llog_operations osc_size_repl_logops = {
3316 lop_cancel: llog_obd_repl_cancel
3319 static struct llog_operations osc_mds_ost_orig_logops;
3320 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3321 int count, struct llog_catid *catid,
3322 struct obd_uuid *uuid)
3327 spin_lock(&obd->obd_dev_lock);
3328 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3329 osc_mds_ost_orig_logops = llog_lvfs_ops;
3330 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3331 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3332 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3333 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3335 spin_unlock(&obd->obd_dev_lock);
3337 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3338 &catid->lci_logid, &osc_mds_ost_orig_logops);
3340 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3344 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3345 &osc_size_repl_logops);
3347 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3350 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3351 obd->obd_name, tgt->obd_name, count, catid, rc);
3352 CERROR("logid "LPX64":0x%x\n",
3353 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3358 static int osc_llog_finish(struct obd_device *obd, int count)
3360 struct llog_ctxt *ctxt;
3361 int rc = 0, rc2 = 0;
3364 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3366 rc = llog_cleanup(ctxt);
3368 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3370 rc2 = llog_cleanup(ctxt);
3377 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3378 struct obd_uuid *cluuid,
3379 struct obd_connect_data *data)
3381 struct client_obd *cli = &obd->u.cli;
3383 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3386 client_obd_list_lock(&cli->cl_loi_list_lock);
3387 data->ocd_grant = cli->cl_avail_grant ?:
3388 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3389 lost_grant = cli->cl_lost_grant;
3390 cli->cl_lost_grant = 0;
3391 client_obd_list_unlock(&cli->cl_loi_list_lock);
3393 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3394 "cl_lost_grant: %ld\n", data->ocd_grant,
3395 cli->cl_avail_grant, lost_grant);
3396 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3397 " ocd_grant: %d\n", data->ocd_connect_flags,
3398 data->ocd_version, data->ocd_grant);
3404 static int osc_disconnect(struct obd_export *exp)
3406 struct obd_device *obd = class_exp2obd(exp);
3407 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3410 if (obd->u.cli.cl_conn_count == 1)
3411 /* flush any remaining cancel messages out to the target */
3412 llog_sync(ctxt, exp);
3414 rc = client_disconnect_export(exp);
3418 static int osc_import_event(struct obd_device *obd,
3419 struct obd_import *imp,
3420 enum obd_import_event event)
3422 struct client_obd *cli;
3426 LASSERT(imp->imp_obd == obd);
3429 case IMP_EVENT_DISCON: {
3430 /* Only do this on the MDS OSC's */
3431 if (imp->imp_server_timeout) {
3432 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3434 spin_lock(&oscc->oscc_lock);
3435 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3436 spin_unlock(&oscc->oscc_lock);
3441 case IMP_EVENT_INACTIVE: {
3442 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3445 case IMP_EVENT_INVALIDATE: {
3446 struct ldlm_namespace *ns = obd->obd_namespace;
3450 client_obd_list_lock(&cli->cl_loi_list_lock);
3451 cli->cl_avail_grant = 0;
3452 cli->cl_lost_grant = 0;
3453 /* all pages go to failing rpcs due to the invalid import */
3454 osc_check_rpcs(cli);
3455 client_obd_list_unlock(&cli->cl_loi_list_lock);
3457 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3461 case IMP_EVENT_ACTIVE: {
3462 /* Only do this on the MDS OSC's */
3463 if (imp->imp_server_timeout) {
3464 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3466 spin_lock(&oscc->oscc_lock);
3467 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3468 spin_unlock(&oscc->oscc_lock);
3470 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3473 case IMP_EVENT_OCD: {
3474 struct obd_connect_data *ocd = &imp->imp_connect_data;
3476 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3477 osc_init_grant(&obd->u.cli, ocd);
3480 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3481 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3483 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3487 CERROR("Unknown import event %d\n", event);
3493 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3499 rc = ptlrpcd_addref();
3503 rc = client_obd_setup(obd, len, buf);
3507 struct lprocfs_static_vars lvars;
3508 struct client_obd *cli = &obd->u.cli;
3510 lprocfs_init_vars(osc, &lvars);
3511 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3512 lproc_osc_attach_seqstat(obd);
3513 ptlrpc_lprocfs_register_obd(obd);
3517 /* We need to allocate a few requests more, because
3518 brw_interpret_oap tries to create new requests before freeing
3519 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3520 reserved, but I afraid that might be too much wasted RAM
3521 in fact, so 2 is just my guess and still should work. */
3522 cli->cl_import->imp_rq_pool =
3523 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3525 ptlrpc_add_rqs_to_pool);
3531 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3537 case OBD_CLEANUP_EARLY: {
3538 struct obd_import *imp;
3539 imp = obd->u.cli.cl_import;
3540 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3541 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3542 ptlrpc_deactivate_import(imp);
3545 case OBD_CLEANUP_EXPORTS: {
3546 /* If we set up but never connected, the
3547 client import will not have been cleaned. */
3548 if (obd->u.cli.cl_import) {
3549 struct obd_import *imp;
3550 imp = obd->u.cli.cl_import;
3551 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3553 ptlrpc_invalidate_import(imp);
3554 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3555 class_destroy_import(imp);
3556 obd->u.cli.cl_import = NULL;
3560 case OBD_CLEANUP_SELF_EXP:
3561 rc = obd_llog_finish(obd, 0);
3563 CERROR("failed to cleanup llogging subsystems\n");
3565 case OBD_CLEANUP_OBD:
3571 int osc_cleanup(struct obd_device *obd)
3573 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3577 ptlrpc_lprocfs_unregister_obd(obd);
3578 lprocfs_obd_cleanup(obd);
3580 spin_lock(&oscc->oscc_lock);
3581 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3582 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3583 spin_unlock(&oscc->oscc_lock);
3585 /* free memory of osc quota cache */
3586 lquota_cleanup(quota_interface, obd);
3588 rc = client_obd_cleanup(obd);
3594 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3596 struct lustre_cfg *lcfg = buf;
3597 struct lprocfs_static_vars lvars;
3600 lprocfs_init_vars(osc, &lvars);
3602 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3606 struct obd_ops osc_obd_ops = {
3607 .o_owner = THIS_MODULE,
3608 .o_setup = osc_setup,
3609 .o_precleanup = osc_precleanup,
3610 .o_cleanup = osc_cleanup,
3611 .o_add_conn = client_import_add_conn,
3612 .o_del_conn = client_import_del_conn,
3613 .o_connect = client_connect_import,
3614 .o_reconnect = osc_reconnect,
3615 .o_disconnect = osc_disconnect,
3616 .o_statfs = osc_statfs,
3617 .o_statfs_async = osc_statfs_async,
3618 .o_packmd = osc_packmd,
3619 .o_unpackmd = osc_unpackmd,
3620 .o_create = osc_create,
3621 .o_destroy = osc_destroy,
3622 .o_getattr = osc_getattr,
3623 .o_getattr_async = osc_getattr_async,
3624 .o_setattr = osc_setattr,
3625 .o_setattr_async = osc_setattr_async,
3627 .o_brw_async = osc_brw_async,
3628 .o_prep_async_page = osc_prep_async_page,
3629 .o_queue_async_io = osc_queue_async_io,
3630 .o_set_async_flags = osc_set_async_flags,
3631 .o_queue_group_io = osc_queue_group_io,
3632 .o_trigger_group_io = osc_trigger_group_io,
3633 .o_teardown_async_page = osc_teardown_async_page,
3634 .o_punch = osc_punch,
3636 .o_enqueue = osc_enqueue,
3637 .o_match = osc_match,
3638 .o_change_cbdata = osc_change_cbdata,
3639 .o_cancel = osc_cancel,
3640 .o_cancel_unused = osc_cancel_unused,
3641 .o_join_lru = osc_join_lru,
3642 .o_iocontrol = osc_iocontrol,
3643 .o_get_info = osc_get_info,
3644 .o_set_info_async = osc_set_info_async,
3645 .o_import_event = osc_import_event,
3646 .o_llog_init = osc_llog_init,
3647 .o_llog_finish = osc_llog_finish,
3648 .o_process_config = osc_process_config,
3651 int __init osc_init(void)
3653 struct lprocfs_static_vars lvars;
3657 lprocfs_init_vars(osc, &lvars);
3659 request_module("lquota");
3660 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3661 lquota_init(quota_interface);
3662 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3664 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3667 if (quota_interface)
3668 PORTAL_SYMBOL_PUT(osc_quota_interface);
3676 static void /*__exit*/ osc_exit(void)
3678 lquota_exit(quota_interface);
3679 if (quota_interface)
3680 PORTAL_SYMBOL_PUT(osc_quota_interface);
3682 class_unregister_type(LUSTRE_OSC_NAME);
3685 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3686 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3687 MODULE_LICENSE("GPL");
3689 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);