1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102 struct lov_mds_md *lmm, int lmm_bytes)
108 if (lmm_bytes < sizeof (*lmm)) {
109 CERROR("lov_mds_md too small: %d, need %d\n",
110 lmm_bytes, (int)sizeof(*lmm));
113 /* XXX LOV_MAGIC etc check? */
115 if (lmm->lmm_object_id == 0) {
116 CERROR("lov_mds_md: zero lmm_object_id\n");
121 lsm_size = lov_stripe_md_size(1);
125 if (*lsmp != NULL && lmm == NULL) {
126 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
127 OBD_FREE(*lsmp, lsm_size);
133 OBD_ALLOC(*lsmp, lsm_size);
136 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137 if ((*lsmp)->lsm_oinfo[0] == NULL) {
138 OBD_FREE(*lsmp, lsm_size);
141 loi_init((*lsmp)->lsm_oinfo[0]);
145 /* XXX zero *lsmp? */
146 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
147 LASSERT((*lsmp)->lsm_object_id);
150 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155 static int osc_getattr_interpret(struct ptlrpc_request *req,
156 struct osc_async_args *aa, int rc)
158 struct ost_body *body;
164 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
165 lustre_swab_ost_body);
167 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
168 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
170 /* This should really be sent by the OST */
171 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
172 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
174 CERROR("can't unpack ost_body\n");
176 aa->aa_oi->oi_oa->o_valid = 0;
179 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
183 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
184 struct ptlrpc_request_set *set)
186 struct ptlrpc_request *req;
187 struct ost_body *body;
188 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
189 struct osc_async_args *aa;
192 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
193 OST_GETATTR, 2, size,NULL);
197 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
198 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
200 ptlrpc_req_set_repsize(req, 2, size);
201 req->rq_interpret_reply = osc_getattr_interpret;
203 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
204 aa = (struct osc_async_args *)&req->rq_async_args;
207 ptlrpc_set_add_req(set, req);
211 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
213 struct ptlrpc_request *req;
214 struct ost_body *body;
215 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
218 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
219 OST_GETATTR, 2, size, NULL);
223 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
224 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
226 ptlrpc_req_set_repsize(req, 2, size);
228 rc = ptlrpc_queue_wait(req);
230 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
234 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
235 lustre_swab_ost_body);
237 CERROR ("can't unpack ost_body\n");
238 GOTO (out, rc = -EPROTO);
241 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
242 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
244 /* This should really be sent by the OST */
245 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
246 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
250 ptlrpc_req_finished(req);
254 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
255 struct obd_trans_info *oti)
257 struct ptlrpc_request *req;
258 struct ost_body *body;
259 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
262 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
263 OST_SETATTR, 2, size, NULL);
267 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
268 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
270 ptlrpc_req_set_repsize(req, 2, size);
272 rc = ptlrpc_queue_wait(req);
276 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
277 lustre_swab_ost_body);
279 GOTO(out, rc = -EPROTO);
281 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
285 ptlrpc_req_finished(req);
289 static int osc_setattr_interpret(struct ptlrpc_request *req,
290 struct osc_async_args *aa, int rc)
292 struct ost_body *body;
298 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
299 lustre_swab_ost_body);
301 CERROR("can't unpack ost_body\n");
302 GOTO(out, rc = -EPROTO);
305 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
307 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
311 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
312 struct obd_trans_info *oti,
313 struct ptlrpc_request_set *rqset)
315 struct ptlrpc_request *req;
316 struct ost_body *body;
317 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
318 struct osc_async_args *aa;
321 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
322 OST_SETATTR, 2, size, NULL);
326 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
328 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
330 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
331 sizeof(*oti->oti_logcookies));
334 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
335 ptlrpc_req_set_repsize(req, 2, size);
336 /* do mds to ost setattr asynchronouly */
338 /* Do not wait for response. */
339 ptlrpcd_add_req(req);
341 req->rq_interpret_reply = osc_setattr_interpret;
343 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
344 aa = (struct osc_async_args *)&req->rq_async_args;
347 ptlrpc_set_add_req(rqset, req);
353 int osc_real_create(struct obd_export *exp, struct obdo *oa,
354 struct lov_stripe_md **ea, struct obd_trans_info *oti)
356 struct ptlrpc_request *req;
357 struct ost_body *body;
358 struct lov_stripe_md *lsm;
359 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
367 rc = obd_alloc_memmd(exp, &lsm);
372 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
373 OST_CREATE, 2, size, NULL);
375 GOTO(out, rc = -ENOMEM);
377 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
378 memcpy(&body->oa, oa, sizeof(body->oa));
380 ptlrpc_req_set_repsize(req, 2, size);
381 if (oa->o_valid & OBD_MD_FLINLINE) {
382 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
383 oa->o_flags == OBD_FL_DELORPHAN);
385 "delorphan from OST integration");
386 /* Don't resend the delorphan req */
387 req->rq_no_resend = req->rq_no_delay = 1;
390 rc = ptlrpc_queue_wait(req);
394 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
395 lustre_swab_ost_body);
397 CERROR ("can't unpack ost_body\n");
398 GOTO (out_req, rc = -EPROTO);
401 memcpy(oa, &body->oa, sizeof(*oa));
403 /* This should really be sent by the OST */
404 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
405 oa->o_valid |= OBD_MD_FLBLKSZ;
407 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
408 * have valid lsm_oinfo data structs, so don't go touching that.
409 * This needs to be fixed in a big way.
411 lsm->lsm_object_id = oa->o_id;
415 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
417 if (oa->o_valid & OBD_MD_FLCOOKIE) {
418 if (!oti->oti_logcookies)
419 oti_alloc_cookies(oti, 1);
420 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
421 sizeof(oti->oti_onecookie));
425 CDEBUG(D_HA, "transno: "LPD64"\n",
426 lustre_msg_get_transno(req->rq_repmsg));
429 ptlrpc_req_finished(req);
432 obd_free_memmd(exp, &lsm);
436 static int osc_punch_interpret(struct ptlrpc_request *req,
437 struct osc_async_args *aa, int rc)
439 struct ost_body *body;
445 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
446 lustre_swab_ost_body);
448 CERROR ("can't unpack ost_body\n");
449 GOTO(out, rc = -EPROTO);
452 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
454 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
458 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
459 struct obd_trans_info *oti,
460 struct ptlrpc_request_set *rqset)
462 struct ptlrpc_request *req;
463 struct osc_async_args *aa;
464 struct ost_body *body;
465 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
473 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
474 OST_PUNCH, 2, size, NULL);
478 /* FIXME bug 249. Also see bug 7198 */
479 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
480 OBD_CONNECT_REQPORTAL)
481 req->rq_request_portal = OST_IO_PORTAL;
483 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
484 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
486 /* overload the size and blocks fields in the oa with start/end */
487 body->oa.o_size = oinfo->oi_policy.l_extent.start;
488 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
489 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
491 ptlrpc_req_set_repsize(req, 2, size);
493 req->rq_interpret_reply = osc_punch_interpret;
494 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
495 aa = (struct osc_async_args *)&req->rq_async_args;
497 ptlrpc_set_add_req(rqset, req);
502 static int osc_sync(struct obd_export *exp, struct obdo *oa,
503 struct lov_stripe_md *md, obd_size start, obd_size end)
505 struct ptlrpc_request *req;
506 struct ost_body *body;
507 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
515 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
516 OST_SYNC, 2, size, NULL);
520 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
521 memcpy(&body->oa, oa, sizeof(*oa));
523 /* overload the size and blocks fields in the oa with start/end */
524 body->oa.o_size = start;
525 body->oa.o_blocks = end;
526 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
528 ptlrpc_req_set_repsize(req, 2, size);
530 rc = ptlrpc_queue_wait(req);
534 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
535 lustre_swab_ost_body);
537 CERROR ("can't unpack ost_body\n");
538 GOTO (out, rc = -EPROTO);
541 memcpy(oa, &body->oa, sizeof(*oa));
545 ptlrpc_req_finished(req);
549 /* Destroy requests can be async always on the client, and we don't even really
550 * care about the return code since the client cannot do anything at all about
552 * When the MDS is unlinking a filename, it saves the file objects into a
553 * recovery llog, and these object records are cancelled when the OST reports
554 * they were destroyed and sync'd to disk (i.e. transaction committed).
555 * If the client dies, or the OST is down when the object should be destroyed,
556 * the records are not cancelled, and when the OST reconnects to the MDS next,
557 * it will retrieve the llog unlink logs and then sends the log cancellation
558 * cookies to the MDS after committing destroy transactions. */
559 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
560 struct lov_stripe_md *ea, struct obd_trans_info *oti,
561 struct obd_export *md_export)
563 struct ptlrpc_request *req;
564 struct ost_body *body;
565 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
573 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
574 OST_DESTROY, 2, size, NULL);
578 /* FIXME bug 249. Also see bug 7198 */
579 if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
580 OBD_CONNECT_REQPORTAL)
581 req->rq_request_portal = OST_IO_PORTAL;
583 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
585 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
586 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
587 sizeof(*oti->oti_logcookies));
590 memcpy(&body->oa, oa, sizeof(*oa));
591 ptlrpc_req_set_repsize(req, 2, size);
593 ptlrpcd_add_req(req);
597 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
600 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
602 LASSERT(!(oa->o_valid & bits));
605 client_obd_list_lock(&cli->cl_loi_list_lock);
606 oa->o_dirty = cli->cl_dirty;
607 if (cli->cl_dirty > cli->cl_dirty_max) {
608 CERROR("dirty %lu > dirty_max %lu\n",
609 cli->cl_dirty, cli->cl_dirty_max);
611 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
612 CERROR("dirty %d > system dirty_max %d\n",
613 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
615 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
616 CERROR("dirty %lu - dirty_max %lu too big???\n",
617 cli->cl_dirty, cli->cl_dirty_max);
620 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
621 (cli->cl_max_rpcs_in_flight + 1);
622 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
624 oa->o_grant = cli->cl_avail_grant;
625 oa->o_dropped = cli->cl_lost_grant;
626 cli->cl_lost_grant = 0;
627 client_obd_list_unlock(&cli->cl_loi_list_lock);
628 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
629 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
632 /* caller must hold loi_list_lock */
633 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
635 atomic_inc(&obd_dirty_pages);
636 cli->cl_dirty += CFS_PAGE_SIZE;
637 cli->cl_avail_grant -= CFS_PAGE_SIZE;
638 pga->flag |= OBD_BRW_FROM_GRANT;
639 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
640 CFS_PAGE_SIZE, pga, pga->pg);
641 LASSERT(cli->cl_avail_grant >= 0);
644 /* the companion to osc_consume_write_grant, called when a brw has completed.
645 * must be called with the loi lock held. */
646 static void osc_release_write_grant(struct client_obd *cli,
647 struct brw_page *pga, int sent)
649 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
652 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
657 pga->flag &= ~OBD_BRW_FROM_GRANT;
658 atomic_dec(&obd_dirty_pages);
659 cli->cl_dirty -= CFS_PAGE_SIZE;
661 cli->cl_lost_grant += CFS_PAGE_SIZE;
662 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
663 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
664 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
665 /* For short writes we shouldn't count parts of pages that
666 * span a whole block on the OST side, or our accounting goes
667 * wrong. Should match the code in filter_grant_check. */
668 int offset = pga->off & ~CFS_PAGE_MASK;
669 int count = pga->count + (offset & (blocksize - 1));
670 int end = (offset + pga->count) & (blocksize - 1);
672 count += blocksize - end;
674 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
675 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
676 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
677 cli->cl_avail_grant, cli->cl_dirty);
683 static unsigned long rpcs_in_flight(struct client_obd *cli)
685 return cli->cl_r_in_flight + cli->cl_w_in_flight;
688 /* caller must hold loi_list_lock */
689 void osc_wake_cache_waiters(struct client_obd *cli)
691 struct list_head *l, *tmp;
692 struct osc_cache_waiter *ocw;
695 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
696 /* if we can't dirty more, we must wait until some is written */
697 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
698 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
699 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
700 "osc max %ld, sys max %d\n", cli->cl_dirty,
701 cli->cl_dirty_max, obd_max_dirty_pages);
705 /* if still dirty cache but no grant wait for pending RPCs that
706 * may yet return us some grant before doing sync writes */
707 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
708 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
709 cli->cl_w_in_flight);
713 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
714 list_del_init(&ocw->ocw_entry);
715 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
716 /* no more RPCs in flight to return grant, do sync IO */
717 ocw->ocw_rc = -EDQUOT;
718 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
720 osc_consume_write_grant(cli,
721 &ocw->ocw_oap->oap_brw_page);
724 cfs_waitq_signal(&ocw->ocw_waitq);
730 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
732 client_obd_list_lock(&cli->cl_loi_list_lock);
733 cli->cl_avail_grant = ocd->ocd_grant;
734 client_obd_list_unlock(&cli->cl_loi_list_lock);
736 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
737 cli->cl_avail_grant, cli->cl_lost_grant);
738 LASSERT(cli->cl_avail_grant >= 0);
741 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
743 client_obd_list_lock(&cli->cl_loi_list_lock);
744 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
745 cli->cl_avail_grant += body->oa.o_grant;
746 /* waiters are woken in brw_interpret_oap */
747 client_obd_list_unlock(&cli->cl_loi_list_lock);
750 /* We assume that the reason this OSC got a short read is because it read
751 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
752 * via the LOV, and it _knows_ it's reading inside the file, it's just that
753 * this stripe never got written at or beyond this stripe offset yet. */
754 static void handle_short_read(int nob_read, obd_count page_count,
755 struct brw_page **pga)
760 /* skip bytes read OK */
761 while (nob_read > 0) {
762 LASSERT (page_count > 0);
764 if (pga[i]->count > nob_read) {
765 /* EOF inside this page */
766 ptr = cfs_kmap(pga[i]->pg) +
767 (pga[i]->off & ~CFS_PAGE_MASK);
768 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
769 cfs_kunmap(pga[i]->pg);
775 nob_read -= pga[i]->count;
780 /* zero remaining pages */
781 while (page_count-- > 0) {
782 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
783 memset(ptr, 0, pga[i]->count);
784 cfs_kunmap(pga[i]->pg);
789 static int check_write_rcs(struct ptlrpc_request *req,
790 int requested_nob, int niocount,
791 obd_count page_count, struct brw_page **pga)
795 /* return error if any niobuf was in error */
796 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
797 sizeof(*remote_rcs) * niocount, NULL);
798 if (remote_rcs == NULL) {
799 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
802 if (lustre_msg_swabbed(req->rq_repmsg))
803 for (i = 0; i < niocount; i++)
804 __swab32s(&remote_rcs[i]);
806 for (i = 0; i < niocount; i++) {
807 if (remote_rcs[i] < 0)
808 return(remote_rcs[i]);
810 if (remote_rcs[i] != 0) {
811 CERROR("rc[%d] invalid (%d) req %p\n",
812 i, remote_rcs[i], req);
817 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
818 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
819 requested_nob, req->rq_bulk->bd_nob_transferred);
826 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
828 if (p1->flag != p2->flag) {
829 unsigned mask = ~OBD_BRW_FROM_GRANT;
831 /* warn if we try to combine flags that we don't know to be
833 if ((p1->flag & mask) != (p2->flag & mask))
834 CERROR("is it ok to have flags 0x%x and 0x%x in the "
835 "same brw?\n", p1->flag, p2->flag);
839 return (p1->off + p1->count == p2->off);
842 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
843 struct brw_page **pga)
848 LASSERT (pg_count > 0);
849 while (nob > 0 && pg_count > 0) {
850 char *ptr = cfs_kmap(pga[i]->pg);
851 int off = pga[i]->off & ~CFS_PAGE_MASK;
852 int count = pga[i]->count > nob ? nob : pga[i]->count;
854 /* corrupt the data before we compute the checksum, to
855 * simulate an OST->client data error */
856 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
857 memcpy(ptr + off, "bad1", min(4, nob));
858 cksum = crc32_le(cksum, ptr + off, count);
859 cfs_kunmap(pga[i]->pg);
860 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
863 nob -= pga[i]->count;
867 /* For sending we only compute the wrong checksum instead
868 * of corrupting the data so it is still correct on a redo */
869 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
875 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
876 struct lov_stripe_md *lsm, obd_count page_count,
877 struct brw_page **pga,
878 struct ptlrpc_request **reqp)
880 struct ptlrpc_request *req;
881 struct ptlrpc_bulk_desc *desc;
882 struct ost_body *body;
883 struct obd_ioobj *ioobj;
884 struct niobuf_remote *niobuf;
885 int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
886 int niocount, i, requested_nob, opc, rc;
887 struct ptlrpc_request_pool *pool;
888 struct osc_brw_async_args *aa;
891 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
892 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
894 for (niocount = i = 1; i < page_count; i++) {
895 if (!can_merge_pages(pga[i - 1], pga[i]))
899 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
900 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
902 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
903 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
908 /* FIXME bug 249. Also see bug 7198 */
909 if (cli->cl_import->imp_connect_data.ocd_connect_flags &
910 OBD_CONNECT_REQPORTAL)
911 req->rq_request_portal = OST_IO_PORTAL;
913 if (opc == OST_WRITE)
914 desc = ptlrpc_prep_bulk_imp (req, page_count,
915 BULK_GET_SOURCE, OST_BULK_PORTAL);
917 desc = ptlrpc_prep_bulk_imp (req, page_count,
918 BULK_PUT_SINK, OST_BULK_PORTAL);
920 GOTO(out, rc = -ENOMEM);
921 /* NB request now owns desc and will free it when it gets freed */
923 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
924 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
925 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
926 niocount * sizeof(*niobuf));
928 memcpy(&body->oa, oa, sizeof(*oa));
930 obdo_to_ioobj(oa, ioobj);
931 ioobj->ioo_bufcnt = niocount;
933 LASSERT (page_count > 0);
934 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
935 struct brw_page *pg = pga[i];
936 struct brw_page *pg_prev = pga[i - 1];
938 LASSERT(pg->count > 0);
939 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
940 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
943 LASSERTF(i == 0 || pg->off > pg_prev->off,
944 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
945 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
947 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
948 pg_prev->pg, page_private(pg_prev->pg),
949 pg_prev->pg->index, pg_prev->off);
951 LASSERTF(i == 0 || pg->off > pg_prev->off,
952 "i %d p_c %u\n", i, page_count);
954 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
955 (pg->flag & OBD_BRW_SRVLOCK));
957 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
959 requested_nob += pg->count;
961 if (i > 0 && can_merge_pages(pg_prev, pg)) {
963 niobuf->len += pg->count;
965 niobuf->offset = pg->off;
966 niobuf->len = pg->count;
967 niobuf->flags = pg->flag;
971 LASSERT((void *)(niobuf - niocount) ==
972 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
973 niocount * sizeof(*niobuf)));
974 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
976 /* size[REQ_REC_OFF] still sizeof (*body) */
977 if (opc == OST_WRITE) {
978 if (unlikely(cli->cl_checksum)) {
979 body->oa.o_valid |= OBD_MD_FLCKSUM;
980 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
982 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
984 /* save this in 'oa', too, for later checking */
985 oa->o_valid |= OBD_MD_FLCKSUM;
987 /* clear out the checksum flag, in case this is a
988 * resend but cl_checksum is no longer set. b=11238 */
989 oa->o_valid &= ~OBD_MD_FLCKSUM;
991 oa->o_cksum = body->oa.o_cksum;
992 /* 1 RC per niobuf */
993 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
994 ptlrpc_req_set_repsize(req, 3, size);
996 if (unlikely(cli->cl_checksum))
997 body->oa.o_valid |= OBD_MD_FLCKSUM;
998 /* 1 RC for the whole I/O */
999 ptlrpc_req_set_repsize(req, 2, size);
1002 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1003 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1005 aa->aa_requested_nob = requested_nob;
1006 aa->aa_nio_count = niocount;
1007 aa->aa_page_count = page_count;
1008 aa->aa_retries = 5; /*retry for checksum errors; lprocfs? */
1011 INIT_LIST_HEAD(&aa->aa_oaps);
1017 ptlrpc_req_finished (req);
1021 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1022 __u32 client_cksum, __u32 server_cksum, int nob,
1023 obd_count page_count, struct brw_page **pga)
1028 if (server_cksum == client_cksum) {
1029 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1033 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1035 if (new_cksum == server_cksum)
1036 msg = "changed on the client after we checksummed it";
1037 else if (new_cksum == client_cksum)
1038 msg = "changed in transit before arrival at OST";
1040 msg = "changed in transit AND doesn't match the original";
1042 LCONSOLE_ERROR("BAD WRITE CHECKSUM: %s: from %s inum "LPU64"/"LPU64
1043 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1044 msg, libcfs_nid2str(peer->nid),
1045 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1046 oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0,
1048 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1050 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1051 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1052 client_cksum, server_cksum, new_cksum);
1057 /* Note rc enters this function as number of bytes transferred */
1058 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1060 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1061 const lnet_process_id_t *peer =
1062 &req->rq_import->imp_connection->c_peer;
1063 struct client_obd *cli = aa->aa_cli;
1064 struct ost_body *body;
1065 __u32 client_cksum = 0;
1068 if (rc < 0 && rc != -EDQUOT)
1071 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1072 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1073 lustre_swab_ost_body);
1075 CERROR ("Can't unpack body\n");
1079 /* set/clear over quota flag for a uid/gid */
1080 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1081 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1082 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1083 body->oa.o_gid, body->oa.o_valid,
1089 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1090 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1092 osc_update_grant(cli, body);
1094 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1096 CERROR ("Unexpected +ve rc %d\n", rc);
1099 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1101 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1103 check_write_checksum(&body->oa, peer, client_cksum,
1105 aa->aa_requested_nob,
1110 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1111 aa->aa_page_count, aa->aa_ppga);
1115 /* The rest of this function executes only for OST_READs */
1116 if (rc > aa->aa_requested_nob) {
1117 CERROR("Unexpected rc %d (%d requested)\n", rc,
1118 aa->aa_requested_nob);
1122 if (rc != req->rq_bulk->bd_nob_transferred) {
1123 CERROR ("Unexpected rc %d (%d transferred)\n",
1124 rc, req->rq_bulk->bd_nob_transferred);
1128 if (rc < aa->aa_requested_nob)
1129 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1131 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1132 static int cksum_counter;
1133 __u32 server_cksum = body->oa.o_cksum;
1137 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1140 if (peer->nid == req->rq_bulk->bd_sender) {
1144 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1147 if (server_cksum == ~0 && rc > 0) {
1148 CERROR("Protocol error: server %s set the 'checksum' "
1149 "bit, but didn't send a checksum. Not fatal, "
1150 "but please tell CFS.\n",
1151 libcfs_nid2str(peer->nid));
1152 } else if (server_cksum != client_cksum) {
1153 LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s%s%s inum "
1154 LPU64"/"LPU64" object "LPU64"/"LPU64
1155 " extent ["LPU64"-"LPU64"]\n",
1156 req->rq_import->imp_obd->obd_name,
1157 libcfs_nid2str(peer->nid),
1159 body->oa.o_valid & OBD_MD_FLFID ?
1160 body->oa.o_fid : (__u64)0,
1161 body->oa.o_valid & OBD_MD_FLFID ?
1162 body->oa.o_generation :(__u64)0,
1164 body->oa.o_valid & OBD_MD_FLGROUP ?
1165 body->oa.o_gr : (__u64)0,
1166 aa->aa_ppga[0]->off,
1167 aa->aa_ppga[aa->aa_page_count-1]->off +
1168 aa->aa_ppga[aa->aa_page_count-1]->count -
1170 CERROR("client %x, server %x\n",
1171 client_cksum, server_cksum);
1173 aa->aa_oa->o_cksum = client_cksum;
1177 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1180 } else if (unlikely(client_cksum)) {
1181 static int cksum_missed;
1184 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1185 CERROR("Checksum %u requested from %s but not sent\n",
1186 cksum_missed, libcfs_nid2str(peer->nid));
1192 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1197 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1198 struct lov_stripe_md *lsm,
1199 obd_count page_count, struct brw_page **pga)
1201 struct ptlrpc_request *request;
1202 int rc, retries = 5; /* lprocfs? */
1206 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1207 page_count, pga, &request);
1211 rc = ptlrpc_queue_wait(request);
1213 if (rc == -ETIMEDOUT && request->rq_resend) {
1214 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1215 ptlrpc_req_finished(request);
1219 rc = osc_brw_fini_request(request, rc);
1221 ptlrpc_req_finished(request);
1222 if (rc == -EAGAIN) {
1230 int osc_brw_redo_request(struct ptlrpc_request *request,
1231 struct osc_brw_async_args *aa)
1233 struct ptlrpc_request *new_req;
1234 struct ptlrpc_request_set *set = request->rq_set;
1235 struct osc_brw_async_args *new_aa;
1236 struct osc_async_page *oap;
1240 if (aa->aa_retries-- <= 0) {
1241 CERROR("too many checksum retries, returning error\n");
1245 DEBUG_REQ(D_ERROR, request, "redo for checksum error");
1246 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1247 if (oap->oap_request != NULL) {
1248 LASSERTF(request == oap->oap_request,
1249 "request %p != oap_request %p\n",
1250 request, oap->oap_request);
1251 if (oap->oap_interrupted) {
1252 ptlrpc_mark_interrupted(oap->oap_request);
1261 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1262 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1263 aa->aa_cli, aa->aa_oa,
1264 NULL /* lsm unused by osc currently */,
1265 aa->aa_page_count, aa->aa_ppga, &new_req);
1269 /* New request takes over pga and oaps from old request.
1270 * Note that copying a list_head doesn't work, need to move it... */
1271 new_req->rq_interpret_reply = request->rq_interpret_reply;
1272 new_req->rq_async_args = request->rq_async_args;
1273 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1274 INIT_LIST_HEAD(&new_aa->aa_oaps);
1275 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1276 INIT_LIST_HEAD(&aa->aa_oaps);
1278 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1279 if (oap->oap_request) {
1280 ptlrpc_req_finished(oap->oap_request);
1281 oap->oap_request = ptlrpc_request_addref(new_req);
1285 ptlrpc_set_add_req(set, new_req);
1290 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1292 struct osc_brw_async_args *aa = data;
1296 rc = osc_brw_fini_request(request, rc);
1297 if (rc == -EAGAIN) {
1298 rc = osc_brw_redo_request(request, aa);
1303 spin_lock(&aa->aa_cli->cl_loi_list_lock);
1304 for (i = 0; i < aa->aa_page_count; i++)
1305 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1306 spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1308 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1313 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1314 struct lov_stripe_md *lsm, obd_count page_count,
1315 struct brw_page **pga, struct ptlrpc_request_set *set)
1317 struct ptlrpc_request *request;
1318 struct client_obd *cli = &exp->exp_obd->u.cli;
1322 /* Consume write credits even if doing a sync write -
1323 * otherwise we may run out of space on OST due to grant. */
1324 if (cmd == OBD_BRW_WRITE) {
1325 spin_lock(&cli->cl_loi_list_lock);
1326 for (i = 0; i < page_count; i++) {
1327 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1328 osc_consume_write_grant(cli, pga[i]);
1330 spin_unlock(&cli->cl_loi_list_lock);
1333 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1334 page_count, pga, &request);
1337 request->rq_interpret_reply = brw_interpret;
1338 ptlrpc_set_add_req(set, request);
1339 } else if (cmd == OBD_BRW_WRITE) {
1340 spin_lock(&cli->cl_loi_list_lock);
1341 for (i = 0; i < page_count; i++)
1342 osc_release_write_grant(cli, pga[i], 0);
1343 spin_unlock(&cli->cl_loi_list_lock);
1350 * ugh, we want disk allocation on the target to happen in offset order. we'll
1351 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1352 * fine for our small page arrays and doesn't require allocation. its an
1353 * insertion sort that swaps elements that are strides apart, shrinking the
1354 * stride down until its '1' and the array is sorted.
1356 static void sort_brw_pages(struct brw_page **array, int num)
1359 struct brw_page *tmp;
1363 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1368 for (i = stride ; i < num ; i++) {
1371 while (j >= stride && array[j-stride]->off > tmp->off) {
1372 array[j] = array[j - stride];
1377 } while (stride > 1);
1380 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1386 LASSERT (pages > 0);
1387 offset = pg[i]->off & (~CFS_PAGE_MASK);
1391 if (pages == 0) /* that's all */
1394 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1395 return count; /* doesn't end on page boundary */
1398 offset = pg[i]->off & (~CFS_PAGE_MASK);
1399 if (offset != 0) /* doesn't start on page boundary */
1406 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1408 struct brw_page **ppga;
1411 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1415 for (i = 0; i < count; i++)
1420 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1422 LASSERT(ppga != NULL);
1423 OBD_FREE(ppga, sizeof(*ppga) * count);
1426 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1427 obd_count page_count, struct brw_page *pga,
1428 struct obd_trans_info *oti)
1430 struct obdo *saved_oa = NULL;
1431 struct brw_page **ppga, **orig;
1432 struct obd_import *imp = class_exp2cliimp(exp);
1433 struct client_obd *cli = &imp->imp_obd->u.cli;
1434 int rc, page_count_orig;
1437 if (cmd & OBD_BRW_CHECK) {
1438 /* The caller just wants to know if there's a chance that this
1439 * I/O can succeed */
1441 if (imp == NULL || imp->imp_invalid)
1446 /* test_brw with a failed create can trip this, maybe others. */
1447 LASSERT(cli->cl_max_pages_per_rpc);
1451 orig = ppga = osc_build_ppga(pga, page_count);
1454 page_count_orig = page_count;
1456 sort_brw_pages(ppga, page_count);
1457 while (page_count) {
1458 obd_count pages_per_brw;
1460 if (page_count > cli->cl_max_pages_per_rpc)
1461 pages_per_brw = cli->cl_max_pages_per_rpc;
1463 pages_per_brw = page_count;
1465 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1467 if (saved_oa != NULL) {
1468 /* restore previously saved oa */
1469 *oinfo->oi_oa = *saved_oa;
1470 } else if (page_count > pages_per_brw) {
1471 /* save a copy of oa (brw will clobber it) */
1472 saved_oa = obdo_alloc();
1473 if (saved_oa == NULL)
1474 GOTO(out, rc = -ENOMEM);
1475 *saved_oa = *oinfo->oi_oa;
1478 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1479 pages_per_brw, ppga);
1484 page_count -= pages_per_brw;
1485 ppga += pages_per_brw;
1489 osc_release_ppga(orig, page_count_orig);
1491 if (saved_oa != NULL)
1492 obdo_free(saved_oa);
1497 static int osc_brw_async(int cmd, struct obd_export *exp,
1498 struct obd_info *oinfo, obd_count page_count,
1499 struct brw_page *pga, struct obd_trans_info *oti,
1500 struct ptlrpc_request_set *set)
1502 struct brw_page **ppga, **orig;
1503 int page_count_orig;
1507 if (cmd & OBD_BRW_CHECK) {
1508 /* The caller just wants to know if there's a chance that this
1509 * I/O can succeed */
1510 struct obd_import *imp = class_exp2cliimp(exp);
1512 if (imp == NULL || imp->imp_invalid)
1517 orig = ppga = osc_build_ppga(pga, page_count);
1520 page_count_orig = page_count;
1522 sort_brw_pages(ppga, page_count);
1523 while (page_count) {
1524 struct brw_page **copy;
1525 obd_count pages_per_brw;
1527 pages_per_brw = min_t(obd_count, page_count,
1528 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1530 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1532 /* use ppga only if single RPC is going to fly */
1533 if (pages_per_brw != page_count_orig || ppga != orig) {
1534 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1536 GOTO(out, rc = -ENOMEM);
1537 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1541 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1542 pages_per_brw, copy, set);
1546 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1551 /* we passed it to async_internal() which is
1552 * now responsible for releasing memory */
1556 page_count -= pages_per_brw;
1557 ppga += pages_per_brw;
1561 osc_release_ppga(orig, page_count_orig);
1565 static void osc_check_rpcs(struct client_obd *cli);
1567 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1568 * the dirty accounting. Writeback completes or truncate happens before
1569 * writing starts. Must be called with the loi lock held. */
1570 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1573 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1576 /* This maintains the lists of pending pages to read/write for a given object
1577 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1578 * to quickly find objects that are ready to send an RPC. */
1579 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1585 if (lop->lop_num_pending == 0)
1588 /* if we have an invalid import we want to drain the queued pages
1589 * by forcing them through rpcs that immediately fail and complete
1590 * the pages. recovery relies on this to empty the queued pages
1591 * before canceling the locks and evicting down the llite pages */
1592 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1595 /* stream rpcs in queue order as long as as there is an urgent page
1596 * queued. this is our cheap solution for good batching in the case
1597 * where writepage marks some random page in the middle of the file
1598 * as urgent because of, say, memory pressure */
1599 if (!list_empty(&lop->lop_urgent)) {
1600 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1604 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1605 optimal = cli->cl_max_pages_per_rpc;
1606 if (cmd & OBD_BRW_WRITE) {
1607 /* trigger a write rpc stream as long as there are dirtiers
1608 * waiting for space. as they're waiting, they're not going to
1609 * create more pages to coallesce with what's waiting.. */
1610 if (!list_empty(&cli->cl_cache_waiters)) {
1611 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1615 /* +16 to avoid triggering rpcs that would want to include pages
1616 * that are being queued but which can't be made ready until
1617 * the queuer finishes with the page. this is a wart for
1618 * llite::commit_write() */
1621 if (lop->lop_num_pending >= optimal)
1627 static void on_list(struct list_head *item, struct list_head *list,
1630 if (list_empty(item) && should_be_on)
1631 list_add_tail(item, list);
1632 else if (!list_empty(item) && !should_be_on)
1633 list_del_init(item);
1636 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1637 * can find pages to build into rpcs quickly */
1638 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1640 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1641 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1642 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1644 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1645 loi->loi_write_lop.lop_num_pending);
1647 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1648 loi->loi_read_lop.lop_num_pending);
1651 static void lop_update_pending(struct client_obd *cli,
1652 struct loi_oap_pages *lop, int cmd, int delta)
1654 lop->lop_num_pending += delta;
1655 if (cmd & OBD_BRW_WRITE)
1656 cli->cl_pending_w_pages += delta;
1658 cli->cl_pending_r_pages += delta;
1661 /* this is called when a sync waiter receives an interruption. Its job is to
1662 * get the caller woken as soon as possible. If its page hasn't been put in an
1663 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1664 * desiring interruption which will forcefully complete the rpc once the rpc
1666 static void osc_occ_interrupted(struct oig_callback_context *occ)
1668 struct osc_async_page *oap;
1669 struct loi_oap_pages *lop;
1670 struct lov_oinfo *loi;
1673 /* XXX member_of() */
1674 oap = list_entry(occ, struct osc_async_page, oap_occ);
1676 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1678 oap->oap_interrupted = 1;
1680 /* ok, it's been put in an rpc. only one oap gets a request reference */
1681 if (oap->oap_request != NULL) {
1682 ptlrpc_mark_interrupted(oap->oap_request);
1683 ptlrpcd_wake(oap->oap_request);
1687 /* we don't get interruption callbacks until osc_trigger_group_io()
1688 * has been called and put the sync oaps in the pending/urgent lists.*/
1689 if (!list_empty(&oap->oap_pending_item)) {
1690 list_del_init(&oap->oap_pending_item);
1691 list_del_init(&oap->oap_urgent_item);
1694 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1695 &loi->loi_write_lop : &loi->loi_read_lop;
1696 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1697 loi_list_maint(oap->oap_cli, oap->oap_loi);
1699 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1700 oap->oap_oig = NULL;
1704 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1707 /* this is trying to propogate async writeback errors back up to the
1708 * application. As an async write fails we record the error code for later if
1709 * the app does an fsync. As long as errors persist we force future rpcs to be
1710 * sync so that the app can get a sync error and break the cycle of queueing
1711 * pages for which writeback will fail. */
1712 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1719 ar->ar_force_sync = 1;
1720 ar->ar_min_xid = ptlrpc_sample_next_xid();
1725 if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1726 ar->ar_force_sync = 0;
1729 static void osc_oap_to_pending(struct osc_async_page *oap)
1731 struct loi_oap_pages *lop;
1733 if (oap->oap_cmd & OBD_BRW_WRITE)
1734 lop = &oap->oap_loi->loi_write_lop;
1736 lop = &oap->oap_loi->loi_read_lop;
1738 if (oap->oap_async_flags & ASYNC_URGENT)
1739 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1740 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1741 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1744 /* this must be called holding the loi list lock to give coverage to exit_cache,
1745 * async_flag maintenance, and oap_request */
1746 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1747 struct osc_async_page *oap, int sent, int rc)
1750 oap->oap_async_flags = 0;
1751 oap->oap_interrupted = 0;
1753 if (oap->oap_cmd & OBD_BRW_WRITE) {
1754 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1755 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1758 if (oap->oap_request != NULL) {
1759 ptlrpc_req_finished(oap->oap_request);
1760 oap->oap_request = NULL;
1763 if (rc == 0 && oa != NULL) {
1764 if (oa->o_valid & OBD_MD_FLBLOCKS)
1765 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1766 if (oa->o_valid & OBD_MD_FLMTIME)
1767 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1768 if (oa->o_valid & OBD_MD_FLATIME)
1769 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1770 if (oa->o_valid & OBD_MD_FLCTIME)
1771 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1775 osc_exit_cache(cli, oap, sent);
1776 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1777 oap->oap_oig = NULL;
1782 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1783 oap->oap_cmd, oa, rc);
1785 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1786 * I/O on the page could start, but OSC calls it under lock
1787 * and thus we can add oap back to pending safely */
1789 /* upper layer wants to leave the page on pending queue */
1790 osc_oap_to_pending(oap);
1792 osc_exit_cache(cli, oap, sent);
1796 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1798 struct osc_brw_async_args *aa = data;
1799 struct osc_async_page *oap, *tmp;
1800 struct client_obd *cli;
1803 rc = osc_brw_fini_request(request, rc);
1804 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1805 if (rc == -EAGAIN) {
1806 rc = osc_brw_redo_request(request, aa);
1814 client_obd_list_lock(&cli->cl_loi_list_lock);
1816 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1817 * is called so we know whether to go to sync BRWs or wait for more
1818 * RPCs to complete */
1819 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1820 cli->cl_w_in_flight--;
1822 cli->cl_r_in_flight--;
1824 /* the caller may re-use the oap after the completion call so
1825 * we need to clean it up a little */
1826 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1827 list_del_init(&oap->oap_rpc_item);
1828 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1831 osc_wake_cache_waiters(cli);
1832 osc_check_rpcs(cli);
1834 client_obd_list_unlock(&cli->cl_loi_list_lock);
1836 obdo_free(aa->aa_oa);
1840 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1844 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1845 struct list_head *rpc_list,
1846 int page_count, int cmd)
1848 struct ptlrpc_request *req;
1849 struct brw_page **pga = NULL;
1850 struct osc_brw_async_args *aa;
1851 struct obdo *oa = NULL;
1852 struct obd_async_page_ops *ops = NULL;
1853 void *caller_data = NULL;
1854 struct osc_async_page *oap;
1858 LASSERT(!list_empty(rpc_list));
1860 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1862 RETURN(ERR_PTR(-ENOMEM));
1866 GOTO(out, req = ERR_PTR(-ENOMEM));
1869 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1871 ops = oap->oap_caller_ops;
1872 caller_data = oap->oap_caller_data;
1874 pga[i] = &oap->oap_brw_page;
1875 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1876 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1877 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1881 /* always get the data for the obdo for the rpc */
1882 LASSERT(ops != NULL);
1883 ops->ap_fill_obdo(caller_data, cmd, oa);
1885 sort_brw_pages(pga, page_count);
1886 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1888 CERROR("prep_req failed: %d\n", rc);
1889 GOTO(out, req = ERR_PTR(rc));
1892 /* Need to update the timestamps after the request is built in case
1893 * we race with setattr (locally or in queue at OST). If OST gets
1894 * later setattr before earlier BRW (as determined by the request xid),
1895 * the OST will not use BRW timestamps. Sadly, there is no obvious
1896 * way to do this in a single call. bug 10150 */
1897 ops->ap_update_obdo(caller_data, cmd, oa,
1898 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1900 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1901 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1902 INIT_LIST_HEAD(&aa->aa_oaps);
1903 list_splice(rpc_list, &aa->aa_oaps);
1904 INIT_LIST_HEAD(rpc_list);
1911 OBD_FREE(pga, sizeof(*pga) * page_count);
1916 /* the loi lock is held across this function but it's allowed to release
1917 * and reacquire it during its work */
1918 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1919 int cmd, struct loi_oap_pages *lop)
1921 struct ptlrpc_request *req;
1922 obd_count page_count = 0;
1923 struct osc_async_page *oap = NULL, *tmp;
1924 struct osc_brw_async_args *aa;
1925 struct obd_async_page_ops *ops;
1926 CFS_LIST_HEAD(rpc_list);
1927 unsigned int ending_offset;
1928 unsigned starting_offset = 0;
1931 /* first we find the pages we're allowed to work with */
1932 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1933 ops = oap->oap_caller_ops;
1935 LASSERT(oap->oap_magic == OAP_MAGIC);
1937 /* in llite being 'ready' equates to the page being locked
1938 * until completion unlocks it. commit_write submits a page
1939 * as not ready because its unlock will happen unconditionally
1940 * as the call returns. if we race with commit_write giving
1941 * us that page we dont' want to create a hole in the page
1942 * stream, so we stop and leave the rpc to be fired by
1943 * another dirtier or kupdated interval (the not ready page
1944 * will still be on the dirty list). we could call in
1945 * at the end of ll_file_write to process the queue again. */
1946 if (!(oap->oap_async_flags & ASYNC_READY)) {
1947 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1949 CDEBUG(D_INODE, "oap %p page %p returned %d "
1950 "instead of ready\n", oap,
1954 /* llite is telling us that the page is still
1955 * in commit_write and that we should try
1956 * and put it in an rpc again later. we
1957 * break out of the loop so we don't create
1958 * a hole in the sequence of pages in the rpc
1963 /* the io isn't needed.. tell the checks
1964 * below to complete the rpc with EINTR */
1965 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1966 oap->oap_count = -EINTR;
1969 oap->oap_async_flags |= ASYNC_READY;
1972 LASSERTF(0, "oap %p page %p returned %d "
1973 "from make_ready\n", oap,
1981 * Page submitted for IO has to be locked. Either by
1982 * ->ap_make_ready() or by higher layers.
1984 * XXX nikita: this assertion should be adjusted when lustre
1985 * starts using PG_writeback for pages being written out.
1987 #if defined(__KERNEL__) && defined(__LINUX__)
1988 LASSERT(PageLocked(oap->oap_page));
1990 /* If there is a gap at the start of this page, it can't merge
1991 * with any previous page, so we'll hand the network a
1992 * "fragmented" page array that it can't transfer in 1 RDMA */
1993 if (page_count != 0 && oap->oap_page_off != 0)
1996 /* take the page out of our book-keeping */
1997 list_del_init(&oap->oap_pending_item);
1998 lop_update_pending(cli, lop, cmd, -1);
1999 list_del_init(&oap->oap_urgent_item);
2001 if (page_count == 0)
2002 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2003 (PTLRPC_MAX_BRW_SIZE - 1);
2005 /* ask the caller for the size of the io as the rpc leaves. */
2006 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2008 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2009 if (oap->oap_count <= 0) {
2010 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2012 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2016 /* now put the page back in our accounting */
2017 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2018 if (++page_count >= cli->cl_max_pages_per_rpc)
2021 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2022 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2023 * have the same alignment as the initial writes that allocated
2024 * extents on the server. */
2025 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2026 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2027 if (ending_offset == 0)
2030 /* If there is a gap at the end of this page, it can't merge
2031 * with any subsequent pages, so we'll hand the network a
2032 * "fragmented" page array that it can't transfer in 1 RDMA */
2033 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2037 osc_wake_cache_waiters(cli);
2039 if (page_count == 0)
2042 loi_list_maint(cli, loi);
2044 client_obd_list_unlock(&cli->cl_loi_list_lock);
2046 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2048 /* this should happen rarely and is pretty bad, it makes the
2049 * pending list not follow the dirty order */
2050 client_obd_list_lock(&cli->cl_loi_list_lock);
2051 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2052 list_del_init(&oap->oap_rpc_item);
2054 /* queued sync pages can be torn down while the pages
2055 * were between the pending list and the rpc */
2056 if (oap->oap_interrupted) {
2057 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2058 osc_ap_completion(cli, NULL, oap, 0,
2062 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2064 loi_list_maint(cli, loi);
2065 RETURN(PTR_ERR(req));
2068 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2069 if (cmd == OBD_BRW_READ) {
2070 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2071 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2072 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2073 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2074 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2076 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2077 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2078 cli->cl_w_in_flight);
2079 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2080 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2081 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2084 client_obd_list_lock(&cli->cl_loi_list_lock);
2086 if (cmd == OBD_BRW_READ)
2087 cli->cl_r_in_flight++;
2089 cli->cl_w_in_flight++;
2091 /* queued sync pages can be torn down while the pages
2092 * were between the pending list and the rpc */
2094 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2095 /* only one oap gets a request reference */
2098 if (oap->oap_interrupted && !req->rq_intr) {
2099 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2101 ptlrpc_mark_interrupted(req);
2105 tmp->oap_request = ptlrpc_request_addref(req);
2107 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2108 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2110 req->rq_interpret_reply = brw_interpret_oap;
2111 ptlrpcd_add_req(req);
2115 #define LOI_DEBUG(LOI, STR, args...) \
2116 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2117 !list_empty(&(LOI)->loi_cli_item), \
2118 (LOI)->loi_write_lop.lop_num_pending, \
2119 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2120 (LOI)->loi_read_lop.lop_num_pending, \
2121 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2124 /* This is called by osc_check_rpcs() to find which objects have pages that
2125 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2126 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2129 /* first return all objects which we already know to have
2130 * pages ready to be stuffed into rpcs */
2131 if (!list_empty(&cli->cl_loi_ready_list))
2132 RETURN(list_entry(cli->cl_loi_ready_list.next,
2133 struct lov_oinfo, loi_cli_item));
2135 /* then if we have cache waiters, return all objects with queued
2136 * writes. This is especially important when many small files
2137 * have filled up the cache and not been fired into rpcs because
2138 * they don't pass the nr_pending/object threshhold */
2139 if (!list_empty(&cli->cl_cache_waiters) &&
2140 !list_empty(&cli->cl_loi_write_list))
2141 RETURN(list_entry(cli->cl_loi_write_list.next,
2142 struct lov_oinfo, loi_write_item));
2144 /* then return all queued objects when we have an invalid import
2145 * so that they get flushed */
2146 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2147 if (!list_empty(&cli->cl_loi_write_list))
2148 RETURN(list_entry(cli->cl_loi_write_list.next,
2149 struct lov_oinfo, loi_write_item));
2150 if (!list_empty(&cli->cl_loi_read_list))
2151 RETURN(list_entry(cli->cl_loi_read_list.next,
2152 struct lov_oinfo, loi_read_item));
2157 /* called with the loi list lock held */
2158 static void osc_check_rpcs(struct client_obd *cli)
2160 struct lov_oinfo *loi;
2161 int rc = 0, race_counter = 0;
2164 while ((loi = osc_next_loi(cli)) != NULL) {
2165 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2167 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2170 /* attempt some read/write balancing by alternating between
2171 * reads and writes in an object. The makes_rpc checks here
2172 * would be redundant if we were getting read/write work items
2173 * instead of objects. we don't want send_oap_rpc to drain a
2174 * partial read pending queue when we're given this object to
2175 * do io on writes while there are cache waiters */
2176 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2177 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2178 &loi->loi_write_lop);
2186 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2187 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2188 &loi->loi_read_lop);
2197 /* attempt some inter-object balancing by issueing rpcs
2198 * for each object in turn */
2199 if (!list_empty(&loi->loi_cli_item))
2200 list_del_init(&loi->loi_cli_item);
2201 if (!list_empty(&loi->loi_write_item))
2202 list_del_init(&loi->loi_write_item);
2203 if (!list_empty(&loi->loi_read_item))
2204 list_del_init(&loi->loi_read_item);
2206 loi_list_maint(cli, loi);
2208 /* send_oap_rpc fails with 0 when make_ready tells it to
2209 * back off. llite's make_ready does this when it tries
2210 * to lock a page queued for write that is already locked.
2211 * we want to try sending rpcs from many objects, but we
2212 * don't want to spin failing with 0. */
2213 if (race_counter == 10)
2219 /* we're trying to queue a page in the osc so we're subject to the
2220 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2221 * If the osc's queued pages are already at that limit, then we want to sleep
2222 * until there is space in the osc's queue for us. We also may be waiting for
2223 * write credits from the OST if there are RPCs in flight that may return some
2224 * before we fall back to sync writes.
2226 * We need this know our allocation was granted in the presence of signals */
2227 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2231 client_obd_list_lock(&cli->cl_loi_list_lock);
2232 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2233 client_obd_list_unlock(&cli->cl_loi_list_lock);
2237 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2238 * grant or cache space. */
2239 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2240 struct osc_async_page *oap)
2242 struct osc_cache_waiter ocw;
2243 struct l_wait_info lwi = { 0 };
2246 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2247 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2248 cli->cl_dirty_max, obd_max_dirty_pages,
2249 cli->cl_lost_grant, cli->cl_avail_grant);
2251 /* force the caller to try sync io. this can jump the list
2252 * of queued writes and create a discontiguous rpc stream */
2253 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2254 loi->loi_ar.ar_force_sync)
2257 /* Hopefully normal case - cache space and write credits available */
2258 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2259 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2260 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2261 /* account for ourselves */
2262 osc_consume_write_grant(cli, &oap->oap_brw_page);
2266 /* Make sure that there are write rpcs in flight to wait for. This
2267 * is a little silly as this object may not have any pending but
2268 * other objects sure might. */
2269 if (cli->cl_w_in_flight) {
2270 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2271 cfs_waitq_init(&ocw.ocw_waitq);
2275 loi_list_maint(cli, loi);
2276 osc_check_rpcs(cli);
2277 client_obd_list_unlock(&cli->cl_loi_list_lock);
2279 CDEBUG(D_CACHE, "sleeping for cache space\n");
2280 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2282 client_obd_list_lock(&cli->cl_loi_list_lock);
2283 if (!list_empty(&ocw.ocw_entry)) {
2284 list_del(&ocw.ocw_entry);
2293 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2294 struct lov_oinfo *loi, cfs_page_t *page,
2295 obd_off offset, struct obd_async_page_ops *ops,
2296 void *data, void **res)
2298 struct osc_async_page *oap;
2302 return size_round(sizeof(*oap));
2305 oap->oap_magic = OAP_MAGIC;
2306 oap->oap_cli = &exp->exp_obd->u.cli;
2309 oap->oap_caller_ops = ops;
2310 oap->oap_caller_data = data;
2312 oap->oap_page = page;
2313 oap->oap_obj_off = offset;
2315 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2316 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2317 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2319 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2321 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2325 struct osc_async_page *oap_from_cookie(void *cookie)
2327 struct osc_async_page *oap = cookie;
2328 if (oap->oap_magic != OAP_MAGIC)
2329 return ERR_PTR(-EINVAL);
2333 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2334 struct lov_oinfo *loi, void *cookie,
2335 int cmd, obd_off off, int count,
2336 obd_flag brw_flags, enum async_flags async_flags)
2338 struct client_obd *cli = &exp->exp_obd->u.cli;
2339 struct osc_async_page *oap;
2343 oap = oap_from_cookie(cookie);
2345 RETURN(PTR_ERR(oap));
2347 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2350 if (!list_empty(&oap->oap_pending_item) ||
2351 !list_empty(&oap->oap_urgent_item) ||
2352 !list_empty(&oap->oap_rpc_item))
2355 /* check if the file's owner/group is over quota */
2356 #ifdef HAVE_QUOTA_SUPPORT
2357 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2358 struct obd_async_page_ops *ops;
2365 ops = oap->oap_caller_ops;
2366 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2367 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2378 loi = lsm->lsm_oinfo[0];
2380 client_obd_list_lock(&cli->cl_loi_list_lock);
2383 oap->oap_page_off = off;
2384 oap->oap_count = count;
2385 oap->oap_brw_flags = brw_flags;
2386 oap->oap_async_flags = async_flags;
2388 if (cmd & OBD_BRW_WRITE) {
2389 rc = osc_enter_cache(cli, loi, oap);
2391 client_obd_list_unlock(&cli->cl_loi_list_lock);
2396 osc_oap_to_pending(oap);
2397 loi_list_maint(cli, loi);
2399 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2402 osc_check_rpcs(cli);
2403 client_obd_list_unlock(&cli->cl_loi_list_lock);
2408 /* aka (~was & now & flag), but this is more clear :) */
2409 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2411 static int osc_set_async_flags(struct obd_export *exp,
2412 struct lov_stripe_md *lsm,
2413 struct lov_oinfo *loi, void *cookie,
2414 obd_flag async_flags)
2416 struct client_obd *cli = &exp->exp_obd->u.cli;
2417 struct loi_oap_pages *lop;
2418 struct osc_async_page *oap;
2422 oap = oap_from_cookie(cookie);
2424 RETURN(PTR_ERR(oap));
2427 * bug 7311: OST-side locking is only supported for liblustre for now
2428 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2429 * implementation has to handle case where OST-locked page was picked
2430 * up by, e.g., ->writepage().
2432 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2433 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2436 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2440 loi = lsm->lsm_oinfo[0];
2442 if (oap->oap_cmd & OBD_BRW_WRITE) {
2443 lop = &loi->loi_write_lop;
2445 lop = &loi->loi_read_lop;
2448 client_obd_list_lock(&cli->cl_loi_list_lock);
2450 if (list_empty(&oap->oap_pending_item))
2451 GOTO(out, rc = -EINVAL);
2453 if ((oap->oap_async_flags & async_flags) == async_flags)
2456 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2457 oap->oap_async_flags |= ASYNC_READY;
2459 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2460 if (list_empty(&oap->oap_rpc_item)) {
2461 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2462 loi_list_maint(cli, loi);
2466 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2467 oap->oap_async_flags);
2469 osc_check_rpcs(cli);
2470 client_obd_list_unlock(&cli->cl_loi_list_lock);
2474 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2475 struct lov_oinfo *loi,
2476 struct obd_io_group *oig, void *cookie,
2477 int cmd, obd_off off, int count,
2479 obd_flag async_flags)
2481 struct client_obd *cli = &exp->exp_obd->u.cli;
2482 struct osc_async_page *oap;
2483 struct loi_oap_pages *lop;
2487 oap = oap_from_cookie(cookie);
2489 RETURN(PTR_ERR(oap));
2491 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2494 if (!list_empty(&oap->oap_pending_item) ||
2495 !list_empty(&oap->oap_urgent_item) ||
2496 !list_empty(&oap->oap_rpc_item))
2500 loi = lsm->lsm_oinfo[0];
2502 client_obd_list_lock(&cli->cl_loi_list_lock);
2505 oap->oap_page_off = off;
2506 oap->oap_count = count;
2507 oap->oap_brw_flags = brw_flags;
2508 oap->oap_async_flags = async_flags;
2510 if (cmd & OBD_BRW_WRITE)
2511 lop = &loi->loi_write_lop;
2513 lop = &loi->loi_read_lop;
2515 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2516 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2518 rc = oig_add_one(oig, &oap->oap_occ);
2521 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2522 oap, oap->oap_page, rc);
2524 client_obd_list_unlock(&cli->cl_loi_list_lock);
2529 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2530 struct loi_oap_pages *lop, int cmd)
2532 struct list_head *pos, *tmp;
2533 struct osc_async_page *oap;
2535 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2536 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2537 list_del(&oap->oap_pending_item);
2538 osc_oap_to_pending(oap);
2540 loi_list_maint(cli, loi);
2543 static int osc_trigger_group_io(struct obd_export *exp,
2544 struct lov_stripe_md *lsm,
2545 struct lov_oinfo *loi,
2546 struct obd_io_group *oig)
2548 struct client_obd *cli = &exp->exp_obd->u.cli;
2552 loi = lsm->lsm_oinfo[0];
2554 client_obd_list_lock(&cli->cl_loi_list_lock);
2556 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2557 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2559 osc_check_rpcs(cli);
2560 client_obd_list_unlock(&cli->cl_loi_list_lock);
2565 static int osc_teardown_async_page(struct obd_export *exp,
2566 struct lov_stripe_md *lsm,
2567 struct lov_oinfo *loi, void *cookie)
2569 struct client_obd *cli = &exp->exp_obd->u.cli;
2570 struct loi_oap_pages *lop;
2571 struct osc_async_page *oap;
2575 oap = oap_from_cookie(cookie);
2577 RETURN(PTR_ERR(oap));
2580 loi = lsm->lsm_oinfo[0];
2582 if (oap->oap_cmd & OBD_BRW_WRITE) {
2583 lop = &loi->loi_write_lop;
2585 lop = &loi->loi_read_lop;
2588 client_obd_list_lock(&cli->cl_loi_list_lock);
2590 if (!list_empty(&oap->oap_rpc_item))
2591 GOTO(out, rc = -EBUSY);
2593 osc_exit_cache(cli, oap, 0);
2594 osc_wake_cache_waiters(cli);
2596 if (!list_empty(&oap->oap_urgent_item)) {
2597 list_del_init(&oap->oap_urgent_item);
2598 oap->oap_async_flags &= ~ASYNC_URGENT;
2600 if (!list_empty(&oap->oap_pending_item)) {
2601 list_del_init(&oap->oap_pending_item);
2602 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2604 loi_list_maint(cli, loi);
2606 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2608 client_obd_list_unlock(&cli->cl_loi_list_lock);
2612 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2615 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2618 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2621 lock_res_and_lock(lock);
2624 /* Liang XXX: Darwin and Winnt checking should be added */
2625 if (lock->l_ast_data && lock->l_ast_data != data) {
2626 struct inode *new_inode = data;
2627 struct inode *old_inode = lock->l_ast_data;
2628 if (!(old_inode->i_state & I_FREEING))
2629 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2630 LASSERTF(old_inode->i_state & I_FREEING,
2631 "Found existing inode %p/%lu/%u state %lu in lock: "
2632 "setting data to %p/%lu/%u\n", old_inode,
2633 old_inode->i_ino, old_inode->i_generation,
2635 new_inode, new_inode->i_ino, new_inode->i_generation);
2639 lock->l_ast_data = data;
2640 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2641 unlock_res_and_lock(lock);
2642 LDLM_LOCK_PUT(lock);
2645 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2646 ldlm_iterator_t replace, void *data)
2648 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2649 struct obd_device *obd = class_exp2obd(exp);
2651 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2655 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2661 /* The request was created before ldlm_cli_enqueue call. */
2662 if (rc == ELDLM_LOCK_ABORTED) {
2663 struct ldlm_reply *rep;
2665 /* swabbed by ldlm_cli_enqueue() */
2666 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2667 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2669 LASSERT(rep != NULL);
2670 if (rep->lock_policy_res1)
2671 rc = rep->lock_policy_res1;
2675 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2676 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2677 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2678 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2679 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2682 /* Call the update callback. */
2683 rc = oinfo->oi_cb_up(oinfo, rc);
2687 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2688 struct osc_enqueue_args *aa, int rc)
2690 int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2691 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2692 struct ldlm_lock *lock;
2694 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2696 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2698 /* Complete obtaining the lock procedure. */
2699 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2701 &aa->oa_ei->ei_flags,
2702 &lsm->lsm_oinfo[0]->loi_lvb,
2703 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2704 lustre_swab_ost_lvb,
2705 aa->oa_oi->oi_lockh, rc);
2707 /* Complete osc stuff. */
2708 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2710 /* Release the lock for async request. */
2711 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2712 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2714 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2715 aa->oa_oi->oi_lockh, req, aa);
2716 LDLM_LOCK_PUT(lock);
2720 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2721 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2722 * other synchronous requests, however keeping some locks and trying to obtain
2723 * others may take a considerable amount of time in a case of ost failure; and
2724 * when other sync requests do not get released lock from a client, the client
2725 * is excluded from the cluster -- such scenarious make the life difficult, so
2726 * release locks just after they are obtained. */
2727 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2728 struct obd_enqueue_info *einfo)
2730 struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2731 struct obd_device *obd = exp->exp_obd;
2732 struct ldlm_reply *rep;
2733 struct ptlrpc_request *req = NULL;
2734 int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2738 /* Filesystem lock extents are extended to page boundaries so that
2739 * dealing with the page cache is a little smoother. */
2740 oinfo->oi_policy.l_extent.start -=
2741 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2742 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2744 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2747 /* Next, search for already existing extent locks that will cover us */
2748 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2749 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2752 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2755 /* I would like to be able to ASSERT here that rss <=
2756 * kms, but I can't, for reasons which are explained in
2760 /* We already have a lock, and it's referenced */
2761 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2763 /* For async requests, decref the lock. */
2764 if (einfo->ei_rqset)
2765 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2770 /* If we're trying to read, we also search for an existing PW lock. The
2771 * VFS and page cache already protect us locally, so lots of readers/
2772 * writers can share a single PW lock.
2774 * There are problems with conversion deadlocks, so instead of
2775 * converting a read lock to a write lock, we'll just enqueue a new
2778 * At some point we should cancel the read lock instead of making them
2779 * send us a blocking callback, but there are problems with canceling
2780 * locks out from other users right now, too. */
2782 if (einfo->ei_mode == LCK_PR) {
2783 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY,
2784 &res_id, einfo->ei_type, &oinfo->oi_policy,
2785 LCK_PW, oinfo->oi_lockh);
2787 /* FIXME: This is not incredibly elegant, but it might
2788 * be more elegant than adding another parameter to
2789 * lock_match. I want a second opinion. */
2790 /* addref the lock only if not async requests. */
2791 if (!einfo->ei_rqset)
2792 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2793 osc_set_data_with_check(oinfo->oi_lockh,
2796 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2797 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2805 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2806 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
2808 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2809 LDLM_ENQUEUE, 2, size, NULL);
2813 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2814 size[DLM_REPLY_REC_OFF] =
2815 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2816 ptlrpc_req_set_repsize(req, 3, size);
2819 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2820 einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2822 rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2823 &oinfo->oi_policy, einfo->ei_mode,
2824 &einfo->ei_flags, einfo->ei_cb_bl,
2825 einfo->ei_cb_cp, einfo->ei_cb_gl,
2827 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2828 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2829 lustre_swab_ost_lvb, oinfo->oi_lockh,
2830 einfo->ei_rqset ? 1 : 0);
2831 if (einfo->ei_rqset) {
2833 struct osc_enqueue_args *aa;
2834 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2835 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2840 req->rq_interpret_reply = osc_enqueue_interpret;
2841 ptlrpc_set_add_req(einfo->ei_rqset, req);
2842 } else if (intent) {
2843 ptlrpc_req_finished(req);
2848 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2850 ptlrpc_req_finished(req);
2855 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2856 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2857 int *flags, void *data, struct lustre_handle *lockh)
2859 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2860 struct obd_device *obd = exp->exp_obd;
2862 int lflags = *flags;
2865 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2867 /* Filesystem lock extents are extended to page boundaries so that
2868 * dealing with the page cache is a little smoother */
2869 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2870 policy->l_extent.end |= ~CFS_PAGE_MASK;
2872 /* Next, search for already existing extent locks that will cover us */
2873 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2874 policy, mode, lockh);
2876 //if (!(*flags & LDLM_FL_TEST_LOCK))
2877 osc_set_data_with_check(lockh, data, lflags);
2880 /* If we're trying to read, we also search for an existing PW lock. The
2881 * VFS and page cache already protect us locally, so lots of readers/
2882 * writers can share a single PW lock. */
2883 if (mode == LCK_PR) {
2884 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2886 policy, LCK_PW, lockh);
2887 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2888 /* FIXME: This is not incredibly elegant, but it might
2889 * be more elegant than adding another parameter to
2890 * lock_match. I want a second opinion. */
2891 osc_set_data_with_check(lockh, data, lflags);
2892 ldlm_lock_addref(lockh, LCK_PR);
2893 ldlm_lock_decref(lockh, LCK_PW);
2899 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2900 __u32 mode, struct lustre_handle *lockh)
2904 if (unlikely(mode == LCK_GROUP))
2905 ldlm_lock_decref_and_cancel(lockh, mode);
2907 ldlm_lock_decref(lockh, mode);
2912 static int osc_cancel_unused(struct obd_export *exp,
2913 struct lov_stripe_md *lsm, int flags, void *opaque)
2915 struct obd_device *obd = class_exp2obd(exp);
2916 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2918 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2922 static int osc_join_lru(struct obd_export *exp,
2923 struct lov_stripe_md *lsm, int join)
2925 struct obd_device *obd = class_exp2obd(exp);
2926 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2928 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2931 static int osc_statfs_interpret(struct ptlrpc_request *req,
2932 struct osc_async_args *aa, int rc)
2934 struct obd_statfs *msfs;
2940 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2941 lustre_swab_obd_statfs);
2943 CERROR("Can't unpack obd_statfs\n");
2944 GOTO(out, rc = -EPROTO);
2947 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2949 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2953 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2954 __u64 max_age, struct ptlrpc_request_set *rqset)
2956 struct ptlrpc_request *req;
2957 struct osc_async_args *aa;
2958 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2961 /* We could possibly pass max_age in the request (as an absolute
2962 * timestamp or a "seconds.usec ago") so the target can avoid doing
2963 * extra calls into the filesystem if that isn't necessary (e.g.
2964 * during mount that would help a bit). Having relative timestamps
2965 * is not so great if request processing is slow, while absolute
2966 * timestamps are not ideal because they need time synchronization. */
2967 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2968 OST_STATFS, 1, NULL, NULL);
2972 ptlrpc_req_set_repsize(req, 2, size);
2973 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2975 req->rq_interpret_reply = osc_statfs_interpret;
2976 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2977 aa = (struct osc_async_args *)&req->rq_async_args;
2980 ptlrpc_set_add_req(rqset, req);
2984 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2987 struct obd_statfs *msfs;
2988 struct ptlrpc_request *req;
2989 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2992 /* We could possibly pass max_age in the request (as an absolute
2993 * timestamp or a "seconds.usec ago") so the target can avoid doing
2994 * extra calls into the filesystem if that isn't necessary (e.g.
2995 * during mount that would help a bit). Having relative timestamps
2996 * is not so great if request processing is slow, while absolute
2997 * timestamps are not ideal because they need time synchronization. */
2998 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2999 OST_STATFS, 1, NULL, NULL);
3003 ptlrpc_req_set_repsize(req, 2, size);
3004 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3006 rc = ptlrpc_queue_wait(req);
3010 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3011 lustre_swab_obd_statfs);
3013 CERROR("Can't unpack obd_statfs\n");
3014 GOTO(out, rc = -EPROTO);
3017 memcpy(osfs, msfs, sizeof(*osfs));
3021 ptlrpc_req_finished(req);
3025 /* Retrieve object striping information.
3027 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3028 * the maximum number of OST indices which will fit in the user buffer.
3029 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3031 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3033 struct lov_user_md lum, *lumk;
3034 int rc = 0, lum_size;
3040 if (copy_from_user(&lum, lump, sizeof(lum)))
3043 if (lum.lmm_magic != LOV_USER_MAGIC)
3046 if (lum.lmm_stripe_count > 0) {
3047 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3048 OBD_ALLOC(lumk, lum_size);
3052 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3054 lum_size = sizeof(lum);
3058 lumk->lmm_object_id = lsm->lsm_object_id;
3059 lumk->lmm_stripe_count = 1;
3061 if (copy_to_user(lump, lumk, lum_size))
3065 OBD_FREE(lumk, lum_size);
3071 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3072 void *karg, void *uarg)
3074 struct obd_device *obd = exp->exp_obd;
3075 struct obd_ioctl_data *data = karg;
3079 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3082 if (!try_module_get(THIS_MODULE)) {
3083 CERROR("Can't get module. Is it alive?");
3088 case OBD_IOC_LOV_GET_CONFIG: {
3090 struct lov_desc *desc;
3091 struct obd_uuid uuid;
3095 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3096 GOTO(out, err = -EINVAL);
3098 data = (struct obd_ioctl_data *)buf;
3100 if (sizeof(*desc) > data->ioc_inllen1) {
3101 obd_ioctl_freedata(buf, len);
3102 GOTO(out, err = -EINVAL);
3105 if (data->ioc_inllen2 < sizeof(uuid)) {
3106 obd_ioctl_freedata(buf, len);
3107 GOTO(out, err = -EINVAL);
3110 desc = (struct lov_desc *)data->ioc_inlbuf1;
3111 desc->ld_tgt_count = 1;
3112 desc->ld_active_tgt_count = 1;
3113 desc->ld_default_stripe_count = 1;
3114 desc->ld_default_stripe_size = 0;
3115 desc->ld_default_stripe_offset = 0;
3116 desc->ld_pattern = 0;
3117 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3119 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3121 err = copy_to_user((void *)uarg, buf, len);
3124 obd_ioctl_freedata(buf, len);
3127 case LL_IOC_LOV_SETSTRIPE:
3128 err = obd_alloc_memmd(exp, karg);
3132 case LL_IOC_LOV_GETSTRIPE:
3133 err = osc_getstripe(karg, uarg);
3135 case OBD_IOC_CLIENT_RECOVER:
3136 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3141 case IOC_OSC_SET_ACTIVE:
3142 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3145 case OBD_IOC_POLL_QUOTACHECK:
3146 err = lquota_poll_check(quota_interface, exp,
3147 (struct if_quotacheck *)karg);
3150 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3151 cmd, cfs_curproc_comm());
3152 GOTO(out, err = -ENOTTY);
3155 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3158 module_put(THIS_MODULE);
3163 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3164 void *key, __u32 *vallen, void *val)
3167 if (!vallen || !val)
3170 if (keylen > strlen("lock_to_stripe") &&
3171 strcmp(key, "lock_to_stripe") == 0) {
3172 __u32 *stripe = val;
3173 *vallen = sizeof(*stripe);
3176 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3177 struct ptlrpc_request *req;
3179 char *bufs[2] = { NULL, key };
3180 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3182 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3183 OST_GET_INFO, 2, size, bufs);
3187 size[REPLY_REC_OFF] = *vallen;
3188 ptlrpc_req_set_repsize(req, 2, size);
3189 rc = ptlrpc_queue_wait(req);
3193 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3194 lustre_swab_ost_last_id);
3195 if (reply == NULL) {
3196 CERROR("Can't unpack OST last ID\n");
3197 GOTO(out, rc = -EPROTO);
3199 *((obd_id *)val) = *reply;
3201 ptlrpc_req_finished(req);
3207 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3210 struct llog_ctxt *ctxt;
3211 struct obd_import *imp = req->rq_import;
3217 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3220 rc = llog_initiator_connect(ctxt);
3222 CERROR("cannot establish connection for "
3223 "ctxt %p: %d\n", ctxt, rc);
3226 spin_lock(&imp->imp_lock);
3227 imp->imp_server_timeout = 1;
3228 imp->imp_pingable = 1;
3229 spin_unlock(&imp->imp_lock);
3230 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3235 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3236 void *key, obd_count vallen, void *val,
3237 struct ptlrpc_request_set *set)
3239 struct ptlrpc_request *req;
3240 struct obd_device *obd = exp->exp_obd;
3241 struct obd_import *imp = class_exp2cliimp(exp);
3242 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3243 char *bufs[3] = { NULL, key, val };
3246 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3248 if (KEY_IS(KEY_NEXT_ID)) {
3249 if (vallen != sizeof(obd_id))
3251 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3252 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3253 exp->exp_obd->obd_name,
3254 obd->u.cli.cl_oscc.oscc_next_id);
3259 if (KEY_IS("unlinked")) {
3260 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3261 spin_lock(&oscc->oscc_lock);
3262 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3263 spin_unlock(&oscc->oscc_lock);
3267 if (KEY_IS(KEY_INIT_RECOV)) {
3268 if (vallen != sizeof(int))
3270 spin_lock(&imp->imp_lock);
3271 imp->imp_initial_recov = *(int *)val;
3272 spin_unlock(&imp->imp_lock);
3273 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3274 exp->exp_obd->obd_name,
3275 imp->imp_initial_recov);
3279 if (KEY_IS("checksum")) {
3280 if (vallen != sizeof(int))
3282 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3289 /* We pass all other commands directly to OST. Since nobody calls osc
3290 methods directly and everybody is supposed to go through LOV, we
3291 assume lov checked invalid values for us.
3292 The only recognised values so far are evict_by_nid and mds_conn.
3293 Even if something bad goes through, we'd get a -EINVAL from OST
3296 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3301 if (KEY_IS("mds_conn"))
3302 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3304 ptlrpc_req_set_repsize(req, 1, NULL);
3305 ptlrpc_set_add_req(set, req);
3306 ptlrpc_check_set(set);
3312 static struct llog_operations osc_size_repl_logops = {
3313 lop_cancel: llog_obd_repl_cancel
3316 static struct llog_operations osc_mds_ost_orig_logops;
3317 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3318 int count, struct llog_catid *catid,
3319 struct obd_uuid *uuid)
3324 spin_lock(&obd->obd_dev_lock);
3325 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3326 osc_mds_ost_orig_logops = llog_lvfs_ops;
3327 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3328 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3329 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3330 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3332 spin_unlock(&obd->obd_dev_lock);
3334 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3335 &catid->lci_logid, &osc_mds_ost_orig_logops);
3337 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3341 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3342 &osc_size_repl_logops);
3344 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3347 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3348 obd->obd_name, tgt->obd_name, count, catid, rc);
3349 CERROR("logid "LPX64":0x%x\n",
3350 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3355 static int osc_llog_finish(struct obd_device *obd, int count)
3357 struct llog_ctxt *ctxt;
3358 int rc = 0, rc2 = 0;
3361 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3363 rc = llog_cleanup(ctxt);
3365 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3367 rc2 = llog_cleanup(ctxt);
3374 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3375 struct obd_uuid *cluuid,
3376 struct obd_connect_data *data)
3378 struct client_obd *cli = &obd->u.cli;
3380 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3383 client_obd_list_lock(&cli->cl_loi_list_lock);
3384 data->ocd_grant = cli->cl_avail_grant ?:
3385 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3386 lost_grant = cli->cl_lost_grant;
3387 cli->cl_lost_grant = 0;
3388 client_obd_list_unlock(&cli->cl_loi_list_lock);
3390 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3391 "cl_lost_grant: %ld\n", data->ocd_grant,
3392 cli->cl_avail_grant, lost_grant);
3393 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3394 " ocd_grant: %d\n", data->ocd_connect_flags,
3395 data->ocd_version, data->ocd_grant);
3401 static int osc_disconnect(struct obd_export *exp)
3403 struct obd_device *obd = class_exp2obd(exp);
3404 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3407 if (obd->u.cli.cl_conn_count == 1)
3408 /* flush any remaining cancel messages out to the target */
3409 llog_sync(ctxt, exp);
3411 rc = client_disconnect_export(exp);
3415 static int osc_import_event(struct obd_device *obd,
3416 struct obd_import *imp,
3417 enum obd_import_event event)
3419 struct client_obd *cli;
3423 LASSERT(imp->imp_obd == obd);
3426 case IMP_EVENT_DISCON: {
3427 /* Only do this on the MDS OSC's */
3428 if (imp->imp_server_timeout) {
3429 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3431 spin_lock(&oscc->oscc_lock);
3432 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3433 spin_unlock(&oscc->oscc_lock);
3438 case IMP_EVENT_INACTIVE: {
3439 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3442 case IMP_EVENT_INVALIDATE: {
3443 struct ldlm_namespace *ns = obd->obd_namespace;
3447 client_obd_list_lock(&cli->cl_loi_list_lock);
3448 cli->cl_avail_grant = 0;
3449 cli->cl_lost_grant = 0;
3450 /* all pages go to failing rpcs due to the invalid import */
3451 osc_check_rpcs(cli);
3452 client_obd_list_unlock(&cli->cl_loi_list_lock);
3454 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3458 case IMP_EVENT_ACTIVE: {
3459 /* Only do this on the MDS OSC's */
3460 if (imp->imp_server_timeout) {
3461 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3463 spin_lock(&oscc->oscc_lock);
3464 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3465 spin_unlock(&oscc->oscc_lock);
3467 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3470 case IMP_EVENT_OCD: {
3471 struct obd_connect_data *ocd = &imp->imp_connect_data;
3473 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3474 osc_init_grant(&obd->u.cli, ocd);
3477 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3478 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3480 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3484 CERROR("Unknown import event %d\n", event);
3490 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3496 rc = ptlrpcd_addref();
3500 rc = client_obd_setup(obd, len, buf);
3504 struct lprocfs_static_vars lvars;
3505 struct client_obd *cli = &obd->u.cli;
3507 lprocfs_init_vars(osc, &lvars);
3508 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3509 lproc_osc_attach_seqstat(obd);
3510 ptlrpc_lprocfs_register_obd(obd);
3514 /* We need to allocate a few requests more, because
3515 brw_interpret_oap tries to create new requests before freeing
3516 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3517 reserved, but I afraid that might be too much wasted RAM
3518 in fact, so 2 is just my guess and still should work. */
3519 cli->cl_import->imp_rq_pool =
3520 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3522 ptlrpc_add_rqs_to_pool);
3528 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3534 case OBD_CLEANUP_EARLY: {
3535 struct obd_import *imp;
3536 imp = obd->u.cli.cl_import;
3537 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3538 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3539 ptlrpc_deactivate_import(imp);
3542 case OBD_CLEANUP_EXPORTS: {
3543 /* If we set up but never connected, the
3544 client import will not have been cleaned. */
3545 if (obd->u.cli.cl_import) {
3546 struct obd_import *imp;
3547 imp = obd->u.cli.cl_import;
3548 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3550 ptlrpc_invalidate_import(imp);
3551 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3552 class_destroy_import(imp);
3553 obd->u.cli.cl_import = NULL;
3557 case OBD_CLEANUP_SELF_EXP:
3558 rc = obd_llog_finish(obd, 0);
3560 CERROR("failed to cleanup llogging subsystems\n");
3562 case OBD_CLEANUP_OBD:
3568 int osc_cleanup(struct obd_device *obd)
3570 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3574 ptlrpc_lprocfs_unregister_obd(obd);
3575 lprocfs_obd_cleanup(obd);
3577 spin_lock(&oscc->oscc_lock);
3578 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3579 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3580 spin_unlock(&oscc->oscc_lock);
3582 /* free memory of osc quota cache */
3583 lquota_cleanup(quota_interface, obd);
3585 rc = client_obd_cleanup(obd);
3591 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3593 struct lustre_cfg *lcfg = buf;
3594 struct lprocfs_static_vars lvars;
3597 lprocfs_init_vars(osc, &lvars);
3599 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3603 struct obd_ops osc_obd_ops = {
3604 .o_owner = THIS_MODULE,
3605 .o_setup = osc_setup,
3606 .o_precleanup = osc_precleanup,
3607 .o_cleanup = osc_cleanup,
3608 .o_add_conn = client_import_add_conn,
3609 .o_del_conn = client_import_del_conn,
3610 .o_connect = client_connect_import,
3611 .o_reconnect = osc_reconnect,
3612 .o_disconnect = osc_disconnect,
3613 .o_statfs = osc_statfs,
3614 .o_statfs_async = osc_statfs_async,
3615 .o_packmd = osc_packmd,
3616 .o_unpackmd = osc_unpackmd,
3617 .o_create = osc_create,
3618 .o_destroy = osc_destroy,
3619 .o_getattr = osc_getattr,
3620 .o_getattr_async = osc_getattr_async,
3621 .o_setattr = osc_setattr,
3622 .o_setattr_async = osc_setattr_async,
3624 .o_brw_async = osc_brw_async,
3625 .o_prep_async_page = osc_prep_async_page,
3626 .o_queue_async_io = osc_queue_async_io,
3627 .o_set_async_flags = osc_set_async_flags,
3628 .o_queue_group_io = osc_queue_group_io,
3629 .o_trigger_group_io = osc_trigger_group_io,
3630 .o_teardown_async_page = osc_teardown_async_page,
3631 .o_punch = osc_punch,
3633 .o_enqueue = osc_enqueue,
3634 .o_match = osc_match,
3635 .o_change_cbdata = osc_change_cbdata,
3636 .o_cancel = osc_cancel,
3637 .o_cancel_unused = osc_cancel_unused,
3638 .o_join_lru = osc_join_lru,
3639 .o_iocontrol = osc_iocontrol,
3640 .o_get_info = osc_get_info,
3641 .o_set_info_async = osc_set_info_async,
3642 .o_import_event = osc_import_event,
3643 .o_llog_init = osc_llog_init,
3644 .o_llog_finish = osc_llog_finish,
3645 .o_process_config = osc_process_config,
3648 int __init osc_init(void)
3650 struct lprocfs_static_vars lvars;
3654 lprocfs_init_vars(osc, &lvars);
3656 request_module("lquota");
3657 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3658 lquota_init(quota_interface);
3659 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3661 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3664 if (quota_interface)
3665 PORTAL_SYMBOL_PUT(osc_quota_interface);
3673 static void /*__exit*/ osc_exit(void)
3675 lquota_exit(quota_interface);
3676 if (quota_interface)
3677 PORTAL_SYMBOL_PUT(osc_quota_interface);
3679 class_unregister_type(LUSTRE_OSC_NAME);
3682 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3683 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3684 MODULE_LICENSE("GPL");
3686 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);