1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
70 atomic_t osc_resend_time;
72 /* Pack OSC object metadata for disk storage (LE byte order). */
73 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
74 struct lov_stripe_md *lsm)
79 lmm_size = sizeof(**lmmp);
84 OBD_FREE(*lmmp, lmm_size);
90 OBD_ALLOC(*lmmp, lmm_size);
96 LASSERT(lsm->lsm_object_id);
97 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105 struct lov_mds_md *lmm, int lmm_bytes)
111 if (lmm_bytes < sizeof (*lmm)) {
112 CERROR("lov_mds_md too small: %d, need %d\n",
113 lmm_bytes, (int)sizeof(*lmm));
116 /* XXX LOV_MAGIC etc check? */
118 if (lmm->lmm_object_id == 0) {
119 CERROR("lov_mds_md: zero lmm_object_id\n");
124 lsm_size = lov_stripe_md_size(1);
128 if (*lsmp != NULL && lmm == NULL) {
129 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130 OBD_FREE(*lsmp, lsm_size);
136 OBD_ALLOC(*lsmp, lsm_size);
139 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141 OBD_FREE(*lsmp, lsm_size);
144 loi_init((*lsmp)->lsm_oinfo[0]);
148 /* XXX zero *lsmp? */
149 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150 LASSERT((*lsmp)->lsm_object_id);
153 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
158 static int osc_getattr_interpret(struct ptlrpc_request *req,
159 struct osc_async_args *aa, int rc)
161 struct ost_body *body;
167 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
168 lustre_swab_ost_body);
170 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
171 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
173 /* This should really be sent by the OST */
174 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
175 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
177 CERROR("can't unpack ost_body\n");
179 aa->aa_oi->oi_oa->o_valid = 0;
182 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
186 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
187 struct ptlrpc_request_set *set)
189 struct ptlrpc_request *req;
190 struct ost_body *body;
191 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
192 struct osc_async_args *aa;
195 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
196 OST_GETATTR, 2, size,NULL);
200 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
201 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
203 ptlrpc_req_set_repsize(req, 2, size);
204 req->rq_interpret_reply = osc_getattr_interpret;
206 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
207 aa = (struct osc_async_args *)&req->rq_async_args;
210 ptlrpc_set_add_req(set, req);
214 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
216 struct ptlrpc_request *req;
217 struct ost_body *body;
218 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
221 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
222 OST_GETATTR, 2, size, NULL);
226 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
227 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
229 ptlrpc_req_set_repsize(req, 2, size);
231 rc = ptlrpc_queue_wait(req);
233 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
237 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
238 lustre_swab_ost_body);
240 CERROR ("can't unpack ost_body\n");
241 GOTO (out, rc = -EPROTO);
244 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
247 /* This should really be sent by the OST */
248 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
249 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
253 ptlrpc_req_finished(req);
257 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
258 struct obd_trans_info *oti)
260 struct ptlrpc_request *req;
261 struct ost_body *body;
262 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
265 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
266 OST_SETATTR, 2, size, NULL);
270 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
271 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
273 ptlrpc_req_set_repsize(req, 2, size);
275 rc = ptlrpc_queue_wait(req);
279 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
280 lustre_swab_ost_body);
282 GOTO(out, rc = -EPROTO);
284 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
288 ptlrpc_req_finished(req);
292 static int osc_setattr_interpret(struct ptlrpc_request *req,
293 struct osc_async_args *aa, int rc)
295 struct ost_body *body;
301 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
302 lustre_swab_ost_body);
304 CERROR("can't unpack ost_body\n");
305 GOTO(out, rc = -EPROTO);
308 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
310 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
314 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
315 struct obd_trans_info *oti,
316 struct ptlrpc_request_set *rqset)
318 struct ptlrpc_request *req;
319 struct ost_body *body;
320 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
321 struct osc_async_args *aa;
324 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
325 OST_SETATTR, 2, size, NULL);
329 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
331 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
333 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
334 sizeof(*oti->oti_logcookies));
337 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
338 ptlrpc_req_set_repsize(req, 2, size);
339 /* do mds to ost setattr asynchronouly */
341 /* Do not wait for response. */
342 ptlrpcd_add_req(req);
344 req->rq_interpret_reply = osc_setattr_interpret;
346 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
347 aa = (struct osc_async_args *)&req->rq_async_args;
350 ptlrpc_set_add_req(rqset, req);
356 int osc_real_create(struct obd_export *exp, struct obdo *oa,
357 struct lov_stripe_md **ea, struct obd_trans_info *oti)
359 struct ptlrpc_request *req;
360 struct ost_body *body;
361 struct lov_stripe_md *lsm;
362 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
370 rc = obd_alloc_memmd(exp, &lsm);
375 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
376 OST_CREATE, 2, size, NULL);
378 GOTO(out, rc = -ENOMEM);
380 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
381 memcpy(&body->oa, oa, sizeof(body->oa));
383 ptlrpc_req_set_repsize(req, 2, size);
384 if (oa->o_valid & OBD_MD_FLINLINE) {
385 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
386 oa->o_flags == OBD_FL_DELORPHAN);
388 "delorphan from OST integration");
389 /* Don't resend the delorphan req */
390 req->rq_no_resend = req->rq_no_delay = 1;
393 rc = ptlrpc_queue_wait(req);
397 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
398 lustre_swab_ost_body);
400 CERROR ("can't unpack ost_body\n");
401 GOTO (out_req, rc = -EPROTO);
404 memcpy(oa, &body->oa, sizeof(*oa));
406 /* This should really be sent by the OST */
407 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
408 oa->o_valid |= OBD_MD_FLBLKSZ;
410 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
411 * have valid lsm_oinfo data structs, so don't go touching that.
412 * This needs to be fixed in a big way.
414 lsm->lsm_object_id = oa->o_id;
418 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
420 if (oa->o_valid & OBD_MD_FLCOOKIE) {
421 if (!oti->oti_logcookies)
422 oti_alloc_cookies(oti, 1);
423 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
424 sizeof(oti->oti_onecookie));
428 CDEBUG(D_HA, "transno: "LPD64"\n",
429 lustre_msg_get_transno(req->rq_repmsg));
431 ptlrpc_req_finished(req);
434 obd_free_memmd(exp, &lsm);
438 static int osc_punch_interpret(struct ptlrpc_request *req,
439 struct osc_async_args *aa, int rc)
441 struct ost_body *body;
447 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
448 lustre_swab_ost_body);
450 CERROR ("can't unpack ost_body\n");
451 GOTO(out, rc = -EPROTO);
454 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
456 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
460 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
461 struct obd_trans_info *oti,
462 struct ptlrpc_request_set *rqset)
464 struct ptlrpc_request *req;
465 struct osc_async_args *aa;
466 struct ost_body *body;
467 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
475 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
476 OST_PUNCH, 2, size, NULL);
480 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
482 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
483 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
485 /* overload the size and blocks fields in the oa with start/end */
486 body->oa.o_size = oinfo->oi_policy.l_extent.start;
487 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
488 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
490 ptlrpc_req_set_repsize(req, 2, size);
492 req->rq_interpret_reply = osc_punch_interpret;
493 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
494 aa = (struct osc_async_args *)&req->rq_async_args;
496 ptlrpc_set_add_req(rqset, req);
501 static int osc_sync(struct obd_export *exp, struct obdo *oa,
502 struct lov_stripe_md *md, obd_size start, obd_size end)
504 struct ptlrpc_request *req;
505 struct ost_body *body;
506 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
514 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
515 OST_SYNC, 2, size, NULL);
519 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
520 memcpy(&body->oa, oa, sizeof(*oa));
522 /* overload the size and blocks fields in the oa with start/end */
523 body->oa.o_size = start;
524 body->oa.o_blocks = end;
525 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
527 ptlrpc_req_set_repsize(req, 2, size);
529 rc = ptlrpc_queue_wait(req);
533 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
534 lustre_swab_ost_body);
536 CERROR ("can't unpack ost_body\n");
537 GOTO (out, rc = -EPROTO);
540 memcpy(oa, &body->oa, sizeof(*oa));
544 ptlrpc_req_finished(req);
548 /* Find and cancel locally locks matched by @mode in the resource found by
549 * @objid. Found locks are added into @cancel list. Returns the amount of
550 * locks added to @cancels list. */
551 static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
552 struct list_head *cancels, ldlm_mode_t mode,
555 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
556 struct ldlm_res_id res_id = { .name = { objid } };
557 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
564 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565 lock_flags, 0, NULL);
566 ldlm_resource_putref(res);
570 /* Destroy requests can be async always on the client, and we don't even really
571 * care about the return code since the client cannot do anything at all about
573 * When the MDS is unlinking a filename, it saves the file objects into a
574 * recovery llog, and these object records are cancelled when the OST reports
575 * they were destroyed and sync'd to disk (i.e. transaction committed).
576 * If the client dies, or the OST is down when the object should be destroyed,
577 * the records are not cancelled, and when the OST reconnects to the MDS next,
578 * it will retrieve the llog unlink logs and then sends the log cancellation
579 * cookies to the MDS after committing destroy transactions. */
580 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
581 struct lov_stripe_md *ea, struct obd_trans_info *oti,
582 struct obd_export *md_export)
584 CFS_LIST_HEAD(cancels);
585 struct ptlrpc_request *req;
586 struct ost_body *body;
587 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
588 int count, bufcount = 2;
596 count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
597 LDLM_FL_DISCARD_DATA);
598 if (exp_connect_cancelset(exp) && count) {
600 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
602 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
603 OST_DESTROY, bufcount, size, NULL);
605 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1);
607 ldlm_lock_list_put(&cancels, l_bl_ast, count);
612 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
614 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
616 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
617 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
618 sizeof(*oti->oti_logcookies));
621 memcpy(&body->oa, oa, sizeof(*oa));
622 ptlrpc_req_set_repsize(req, 2, size);
624 ptlrpcd_add_req(req);
628 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
631 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
633 LASSERT(!(oa->o_valid & bits));
636 client_obd_list_lock(&cli->cl_loi_list_lock);
637 oa->o_dirty = cli->cl_dirty;
638 if (cli->cl_dirty > cli->cl_dirty_max) {
639 CERROR("dirty %lu > dirty_max %lu\n",
640 cli->cl_dirty, cli->cl_dirty_max);
642 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
643 CERROR("dirty %d > system dirty_max %d\n",
644 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
646 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
647 CERROR("dirty %lu - dirty_max %lu too big???\n",
648 cli->cl_dirty, cli->cl_dirty_max);
651 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
652 (cli->cl_max_rpcs_in_flight + 1);
653 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
655 oa->o_grant = cli->cl_avail_grant;
656 oa->o_dropped = cli->cl_lost_grant;
657 cli->cl_lost_grant = 0;
658 client_obd_list_unlock(&cli->cl_loi_list_lock);
659 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
660 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
663 /* caller must hold loi_list_lock */
664 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
666 atomic_inc(&obd_dirty_pages);
667 cli->cl_dirty += CFS_PAGE_SIZE;
668 cli->cl_avail_grant -= CFS_PAGE_SIZE;
669 pga->flag |= OBD_BRW_FROM_GRANT;
670 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
671 CFS_PAGE_SIZE, pga, pga->pg);
672 LASSERT(cli->cl_avail_grant >= 0);
675 /* the companion to osc_consume_write_grant, called when a brw has completed.
676 * must be called with the loi lock held. */
677 static void osc_release_write_grant(struct client_obd *cli,
678 struct brw_page *pga, int sent)
680 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
683 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
688 pga->flag &= ~OBD_BRW_FROM_GRANT;
689 atomic_dec(&obd_dirty_pages);
690 cli->cl_dirty -= CFS_PAGE_SIZE;
692 cli->cl_lost_grant += CFS_PAGE_SIZE;
693 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
694 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
695 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
696 /* For short writes we shouldn't count parts of pages that
697 * span a whole block on the OST side, or our accounting goes
698 * wrong. Should match the code in filter_grant_check. */
699 int offset = pga->off & ~CFS_PAGE_MASK;
700 int count = pga->count + (offset & (blocksize - 1));
701 int end = (offset + pga->count) & (blocksize - 1);
703 count += blocksize - end;
705 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
706 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
707 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
708 cli->cl_avail_grant, cli->cl_dirty);
714 static unsigned long rpcs_in_flight(struct client_obd *cli)
716 return cli->cl_r_in_flight + cli->cl_w_in_flight;
719 /* caller must hold loi_list_lock */
720 void osc_wake_cache_waiters(struct client_obd *cli)
722 struct list_head *l, *tmp;
723 struct osc_cache_waiter *ocw;
726 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
727 /* if we can't dirty more, we must wait until some is written */
728 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
729 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
730 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
731 "osc max %ld, sys max %d\n", cli->cl_dirty,
732 cli->cl_dirty_max, obd_max_dirty_pages);
736 /* if still dirty cache but no grant wait for pending RPCs that
737 * may yet return us some grant before doing sync writes */
738 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
739 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
740 cli->cl_w_in_flight);
744 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
745 list_del_init(&ocw->ocw_entry);
746 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
747 /* no more RPCs in flight to return grant, do sync IO */
748 ocw->ocw_rc = -EDQUOT;
749 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
751 osc_consume_write_grant(cli,
752 &ocw->ocw_oap->oap_brw_page);
755 cfs_waitq_signal(&ocw->ocw_waitq);
761 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
763 client_obd_list_lock(&cli->cl_loi_list_lock);
764 cli->cl_avail_grant = ocd->ocd_grant;
765 client_obd_list_unlock(&cli->cl_loi_list_lock);
767 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
768 cli->cl_avail_grant, cli->cl_lost_grant);
769 LASSERT(cli->cl_avail_grant >= 0);
772 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 client_obd_list_lock(&cli->cl_loi_list_lock);
775 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
776 cli->cl_avail_grant += body->oa.o_grant;
777 /* waiters are woken in brw_interpret_oap */
778 client_obd_list_unlock(&cli->cl_loi_list_lock);
781 /* We assume that the reason this OSC got a short read is because it read
782 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
783 * via the LOV, and it _knows_ it's reading inside the file, it's just that
784 * this stripe never got written at or beyond this stripe offset yet. */
785 static void handle_short_read(int nob_read, obd_count page_count,
786 struct brw_page **pga)
791 /* skip bytes read OK */
792 while (nob_read > 0) {
793 LASSERT (page_count > 0);
795 if (pga[i]->count > nob_read) {
796 /* EOF inside this page */
797 ptr = cfs_kmap(pga[i]->pg) +
798 (pga[i]->off & ~CFS_PAGE_MASK);
799 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
800 cfs_kunmap(pga[i]->pg);
806 nob_read -= pga[i]->count;
811 /* zero remaining pages */
812 while (page_count-- > 0) {
813 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
814 memset(ptr, 0, pga[i]->count);
815 cfs_kunmap(pga[i]->pg);
820 static int check_write_rcs(struct ptlrpc_request *req,
821 int requested_nob, int niocount,
822 obd_count page_count, struct brw_page **pga)
826 /* return error if any niobuf was in error */
827 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
828 sizeof(*remote_rcs) * niocount, NULL);
829 if (remote_rcs == NULL) {
830 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
833 if (lustre_msg_swabbed(req->rq_repmsg))
834 for (i = 0; i < niocount; i++)
835 __swab32s(&remote_rcs[i]);
837 for (i = 0; i < niocount; i++) {
838 if (remote_rcs[i] < 0)
839 return(remote_rcs[i]);
841 if (remote_rcs[i] != 0) {
842 CERROR("rc[%d] invalid (%d) req %p\n",
843 i, remote_rcs[i], req);
848 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
849 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
850 requested_nob, req->rq_bulk->bd_nob_transferred);
857 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
859 if (p1->flag != p2->flag) {
860 unsigned mask = ~OBD_BRW_FROM_GRANT;
862 /* warn if we try to combine flags that we don't know to be
864 if ((p1->flag & mask) != (p2->flag & mask))
865 CERROR("is it ok to have flags 0x%x and 0x%x in the "
866 "same brw?\n", p1->flag, p2->flag);
870 return (p1->off + p1->count == p2->off);
873 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
874 struct brw_page **pga)
879 LASSERT (pg_count > 0);
880 while (nob > 0 && pg_count > 0) {
881 char *ptr = cfs_kmap(pga[i]->pg);
882 int off = pga[i]->off & ~CFS_PAGE_MASK;
883 int count = pga[i]->count > nob ? nob : pga[i]->count;
885 /* corrupt the data before we compute the checksum, to
886 * simulate an OST->client data error */
887 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
888 memcpy(ptr + off, "bad1", min(4, nob));
889 cksum = crc32_le(cksum, ptr + off, count);
890 cfs_kunmap(pga[i]->pg);
891 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
894 nob -= pga[i]->count;
898 /* For sending we only compute the wrong checksum instead
899 * of corrupting the data so it is still correct on a redo */
900 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
906 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
907 struct lov_stripe_md *lsm, obd_count page_count,
908 struct brw_page **pga,
909 struct ptlrpc_request **reqp)
911 struct ptlrpc_request *req;
912 struct ptlrpc_bulk_desc *desc;
913 struct ost_body *body;
914 struct obd_ioobj *ioobj;
915 struct niobuf_remote *niobuf;
916 int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
917 int niocount, i, requested_nob, opc, rc;
918 struct ptlrpc_request_pool *pool;
919 struct osc_brw_async_args *aa;
922 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
923 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
925 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
926 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
928 for (niocount = i = 1; i < page_count; i++) {
929 if (!can_merge_pages(pga[i - 1], pga[i]))
933 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
934 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
936 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
941 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
943 if (opc == OST_WRITE)
944 desc = ptlrpc_prep_bulk_imp (req, page_count,
945 BULK_GET_SOURCE, OST_BULK_PORTAL);
947 desc = ptlrpc_prep_bulk_imp (req, page_count,
948 BULK_PUT_SINK, OST_BULK_PORTAL);
950 GOTO(out, rc = -ENOMEM);
951 /* NB request now owns desc and will free it when it gets freed */
953 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
954 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
955 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
956 niocount * sizeof(*niobuf));
958 memcpy(&body->oa, oa, sizeof(*oa));
960 obdo_to_ioobj(oa, ioobj);
961 ioobj->ioo_bufcnt = niocount;
963 LASSERT (page_count > 0);
964 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
965 struct brw_page *pg = pga[i];
966 struct brw_page *pg_prev = pga[i - 1];
968 LASSERT(pg->count > 0);
969 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
970 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
973 LASSERTF(i == 0 || pg->off > pg_prev->off,
974 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
975 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
977 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
978 pg_prev->pg, page_private(pg_prev->pg),
979 pg_prev->pg->index, pg_prev->off);
981 LASSERTF(i == 0 || pg->off > pg_prev->off,
982 "i %d p_c %u\n", i, page_count);
984 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
985 (pg->flag & OBD_BRW_SRVLOCK));
987 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
989 requested_nob += pg->count;
991 if (i > 0 && can_merge_pages(pg_prev, pg)) {
993 niobuf->len += pg->count;
995 niobuf->offset = pg->off;
996 niobuf->len = pg->count;
997 niobuf->flags = pg->flag;
1001 LASSERT((void *)(niobuf - niocount) ==
1002 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1003 niocount * sizeof(*niobuf)));
1004 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1006 /* size[REQ_REC_OFF] still sizeof (*body) */
1007 if (opc == OST_WRITE) {
1008 if (unlikely(cli->cl_checksum)) {
1009 body->oa.o_valid |= OBD_MD_FLCKSUM;
1010 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1012 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1014 /* save this in 'oa', too, for later checking */
1015 oa->o_valid |= OBD_MD_FLCKSUM;
1017 /* clear out the checksum flag, in case this is a
1018 * resend but cl_checksum is no longer set. b=11238 */
1019 oa->o_valid &= ~OBD_MD_FLCKSUM;
1021 oa->o_cksum = body->oa.o_cksum;
1022 /* 1 RC per niobuf */
1023 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1024 ptlrpc_req_set_repsize(req, 3, size);
1026 if (unlikely(cli->cl_checksum))
1027 body->oa.o_valid |= OBD_MD_FLCKSUM;
1028 /* 1 RC for the whole I/O */
1029 ptlrpc_req_set_repsize(req, 2, size);
1032 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1033 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1035 aa->aa_requested_nob = requested_nob;
1036 aa->aa_nio_count = niocount;
1037 aa->aa_page_count = page_count;
1038 aa->aa_start_send = cfs_time_current();
1041 INIT_LIST_HEAD(&aa->aa_oaps);
1047 ptlrpc_req_finished (req);
1051 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1052 __u32 client_cksum, __u32 server_cksum, int nob,
1053 obd_count page_count, struct brw_page **pga)
1058 if (server_cksum == client_cksum) {
1059 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1063 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1065 if (new_cksum == server_cksum)
1066 msg = "changed on the client after we checksummed it - "
1067 "likely false positive due to mmap IO (bug 11742)";
1068 else if (new_cksum == client_cksum)
1069 msg = "changed in transit before arrival at OST";
1071 msg = "changed in transit AND doesn't match the original - "
1072 "likely false positive due to mmap IO (bug 11742)";
1074 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1075 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1076 "["LPU64"-"LPU64"]\n",
1077 msg, libcfs_nid2str(peer->nid),
1078 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1079 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1082 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1084 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1085 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1086 client_cksum, server_cksum, new_cksum);
1091 /* Note rc enters this function as number of bytes transferred */
1092 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1094 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1095 const lnet_process_id_t *peer =
1096 &req->rq_import->imp_connection->c_peer;
1097 struct client_obd *cli = aa->aa_cli;
1098 struct ost_body *body;
1099 __u32 client_cksum = 0;
1102 if (rc < 0 && rc != -EDQUOT)
1105 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1106 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1107 lustre_swab_ost_body);
1109 CERROR ("Can't unpack body\n");
1113 /* set/clear over quota flag for a uid/gid */
1114 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1115 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1116 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1117 body->oa.o_gid, body->oa.o_valid,
1123 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1124 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1126 osc_update_grant(cli, body);
1128 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1130 CERROR ("Unexpected +ve rc %d\n", rc);
1133 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1135 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1137 check_write_checksum(&body->oa, peer, client_cksum,
1139 aa->aa_requested_nob,
1144 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1145 aa->aa_page_count, aa->aa_ppga);
1149 /* The rest of this function executes only for OST_READs */
1150 if (rc > aa->aa_requested_nob) {
1151 CERROR("Unexpected rc %d (%d requested)\n", rc,
1152 aa->aa_requested_nob);
1156 if (rc != req->rq_bulk->bd_nob_transferred) {
1157 CERROR ("Unexpected rc %d (%d transferred)\n",
1158 rc, req->rq_bulk->bd_nob_transferred);
1162 if (rc < aa->aa_requested_nob)
1163 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1165 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1166 static int cksum_counter;
1167 __u32 server_cksum = body->oa.o_cksum;
1171 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1174 if (peer->nid == req->rq_bulk->bd_sender) {
1178 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1181 if (server_cksum == ~0 && rc > 0) {
1182 CERROR("Protocol error: server %s set the 'checksum' "
1183 "bit, but didn't send a checksum. Not fatal, "
1184 "but please tell CFS.\n",
1185 libcfs_nid2str(peer->nid));
1186 } else if (server_cksum != client_cksum) {
1187 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1188 "%s%s%s inum "LPU64"/"LPU64" object "
1189 LPU64"/"LPU64" extent "
1190 "["LPU64"-"LPU64"]\n",
1191 req->rq_import->imp_obd->obd_name,
1192 libcfs_nid2str(peer->nid),
1194 body->oa.o_valid & OBD_MD_FLFID ?
1195 body->oa.o_fid : (__u64)0,
1196 body->oa.o_valid & OBD_MD_FLFID ?
1197 body->oa.o_generation :(__u64)0,
1199 body->oa.o_valid & OBD_MD_FLGROUP ?
1200 body->oa.o_gr : (__u64)0,
1201 aa->aa_ppga[0]->off,
1202 aa->aa_ppga[aa->aa_page_count-1]->off +
1203 aa->aa_ppga[aa->aa_page_count-1]->count -
1205 CERROR("client %x, server %x\n",
1206 client_cksum, server_cksum);
1208 aa->aa_oa->o_cksum = client_cksum;
1212 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1215 } else if (unlikely(client_cksum)) {
1216 static int cksum_missed;
1219 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1220 CERROR("Checksum %u requested from %s but not sent\n",
1221 cksum_missed, libcfs_nid2str(peer->nid));
1227 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1232 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1233 struct lov_stripe_md *lsm,
1234 obd_count page_count, struct brw_page **pga)
1236 struct ptlrpc_request *request;
1238 cfs_time_t start_send = cfs_time_current();
1242 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1243 page_count, pga, &request);
1247 rc = ptlrpc_queue_wait(request);
1249 if (rc == -ETIMEDOUT && request->rq_resend) {
1250 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1251 ptlrpc_req_finished(request);
1255 rc = osc_brw_fini_request(request, rc);
1257 ptlrpc_req_finished(request);
1258 if (osc_recoverable_error(rc)) {
1259 if (!osc_should_resend(start_send)) {
1260 CERROR("too many resend retries, returning error\n");
1268 int osc_brw_redo_request(struct ptlrpc_request *request,
1269 struct osc_brw_async_args *aa)
1271 struct ptlrpc_request *new_req;
1272 struct ptlrpc_request_set *set = request->rq_set;
1273 struct osc_brw_async_args *new_aa;
1274 struct osc_async_page *oap;
1278 if (!osc_should_resend(aa->aa_start_send)) {
1279 CERROR("too many resend retries, returning error\n");
1283 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1284 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1285 if (oap->oap_request != NULL) {
1286 LASSERTF(request == oap->oap_request,
1287 "request %p != oap_request %p\n",
1288 request, oap->oap_request);
1289 if (oap->oap_interrupted) {
1290 ptlrpc_mark_interrupted(oap->oap_request);
1299 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1300 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1301 aa->aa_cli, aa->aa_oa,
1302 NULL /* lsm unused by osc currently */,
1303 aa->aa_page_count, aa->aa_ppga, &new_req);
1307 /* New request takes over pga and oaps from old request.
1308 * Note that copying a list_head doesn't work, need to move it... */
1309 new_req->rq_interpret_reply = request->rq_interpret_reply;
1310 new_req->rq_async_args = request->rq_async_args;
1311 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1312 INIT_LIST_HEAD(&new_aa->aa_oaps);
1313 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1314 INIT_LIST_HEAD(&aa->aa_oaps);
1316 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1317 if (oap->oap_request) {
1318 ptlrpc_req_finished(oap->oap_request);
1319 oap->oap_request = ptlrpc_request_addref(new_req);
1323 ptlrpc_set_add_req(set, new_req);
1328 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1330 struct osc_brw_async_args *aa = data;
1335 rc = osc_brw_fini_request(request, rc);
1336 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1337 if (osc_recoverable_error(rc)) {
1338 rc = osc_brw_redo_request(request, aa);
1342 if ((rc >= 0) && request->rq_set && request->rq_set->set_countp)
1343 atomic_add(nob, (atomic_t *)request->rq_set->set_countp);
1345 spin_lock(&aa->aa_cli->cl_loi_list_lock);
1346 for (i = 0; i < aa->aa_page_count; i++)
1347 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1348 spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1350 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1355 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1356 struct lov_stripe_md *lsm, obd_count page_count,
1357 struct brw_page **pga, struct ptlrpc_request_set *set)
1359 struct ptlrpc_request *request;
1360 struct client_obd *cli = &exp->exp_obd->u.cli;
1364 /* Consume write credits even if doing a sync write -
1365 * otherwise we may run out of space on OST due to grant. */
1366 if (cmd == OBD_BRW_WRITE) {
1367 spin_lock(&cli->cl_loi_list_lock);
1368 for (i = 0; i < page_count; i++) {
1369 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1370 osc_consume_write_grant(cli, pga[i]);
1372 spin_unlock(&cli->cl_loi_list_lock);
1375 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1376 page_count, pga, &request);
1379 request->rq_interpret_reply = brw_interpret;
1380 ptlrpc_set_add_req(set, request);
1381 } else if (cmd == OBD_BRW_WRITE) {
1382 spin_lock(&cli->cl_loi_list_lock);
1383 for (i = 0; i < page_count; i++)
1384 osc_release_write_grant(cli, pga[i], 0);
1385 spin_unlock(&cli->cl_loi_list_lock);
1392 * ugh, we want disk allocation on the target to happen in offset order. we'll
1393 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1394 * fine for our small page arrays and doesn't require allocation. its an
1395 * insertion sort that swaps elements that are strides apart, shrinking the
1396 * stride down until its '1' and the array is sorted.
1398 static void sort_brw_pages(struct brw_page **array, int num)
1401 struct brw_page *tmp;
1405 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1410 for (i = stride ; i < num ; i++) {
1413 while (j >= stride && array[j-stride]->off > tmp->off) {
1414 array[j] = array[j - stride];
1419 } while (stride > 1);
1422 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1428 LASSERT (pages > 0);
1429 offset = pg[i]->off & (~CFS_PAGE_MASK);
1433 if (pages == 0) /* that's all */
1436 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1437 return count; /* doesn't end on page boundary */
1440 offset = pg[i]->off & (~CFS_PAGE_MASK);
1441 if (offset != 0) /* doesn't start on page boundary */
1448 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1450 struct brw_page **ppga;
1453 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1457 for (i = 0; i < count; i++)
1462 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1464 LASSERT(ppga != NULL);
1465 OBD_FREE(ppga, sizeof(*ppga) * count);
1468 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1469 obd_count page_count, struct brw_page *pga,
1470 struct obd_trans_info *oti)
1472 struct obdo *saved_oa = NULL;
1473 struct brw_page **ppga, **orig;
1474 struct obd_import *imp = class_exp2cliimp(exp);
1475 struct client_obd *cli = &imp->imp_obd->u.cli;
1476 int rc, page_count_orig;
1479 if (cmd & OBD_BRW_CHECK) {
1480 /* The caller just wants to know if there's a chance that this
1481 * I/O can succeed */
1483 if (imp == NULL || imp->imp_invalid)
1488 /* test_brw with a failed create can trip this, maybe others. */
1489 LASSERT(cli->cl_max_pages_per_rpc);
1493 orig = ppga = osc_build_ppga(pga, page_count);
1496 page_count_orig = page_count;
1498 sort_brw_pages(ppga, page_count);
1499 while (page_count) {
1500 obd_count pages_per_brw;
1502 if (page_count > cli->cl_max_pages_per_rpc)
1503 pages_per_brw = cli->cl_max_pages_per_rpc;
1505 pages_per_brw = page_count;
1507 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1509 if (saved_oa != NULL) {
1510 /* restore previously saved oa */
1511 *oinfo->oi_oa = *saved_oa;
1512 } else if (page_count > pages_per_brw) {
1513 /* save a copy of oa (brw will clobber it) */
1514 OBDO_ALLOC(saved_oa);
1515 if (saved_oa == NULL)
1516 GOTO(out, rc = -ENOMEM);
1517 *saved_oa = *oinfo->oi_oa;
1520 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1521 pages_per_brw, ppga);
1526 page_count -= pages_per_brw;
1527 ppga += pages_per_brw;
1531 osc_release_ppga(orig, page_count_orig);
1533 if (saved_oa != NULL)
1534 OBDO_FREE(saved_oa);
1539 static int osc_brw_async(int cmd, struct obd_export *exp,
1540 struct obd_info *oinfo, obd_count page_count,
1541 struct brw_page *pga, struct obd_trans_info *oti,
1542 struct ptlrpc_request_set *set)
1544 struct brw_page **ppga, **orig;
1545 int page_count_orig;
1549 if (cmd & OBD_BRW_CHECK) {
1550 /* The caller just wants to know if there's a chance that this
1551 * I/O can succeed */
1552 struct obd_import *imp = class_exp2cliimp(exp);
1554 if (imp == NULL || imp->imp_invalid)
1559 orig = ppga = osc_build_ppga(pga, page_count);
1562 page_count_orig = page_count;
1564 sort_brw_pages(ppga, page_count);
1565 while (page_count) {
1566 struct brw_page **copy;
1567 obd_count pages_per_brw;
1569 pages_per_brw = min_t(obd_count, page_count,
1570 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1572 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1574 /* use ppga only if single RPC is going to fly */
1575 if (pages_per_brw != page_count_orig || ppga != orig) {
1576 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1578 GOTO(out, rc = -ENOMEM);
1579 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1583 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1584 pages_per_brw, copy, set);
1588 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1593 /* we passed it to async_internal() which is
1594 * now responsible for releasing memory */
1598 page_count -= pages_per_brw;
1599 ppga += pages_per_brw;
1603 osc_release_ppga(orig, page_count_orig);
1607 static void osc_check_rpcs(struct client_obd *cli);
1609 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1610 * the dirty accounting. Writeback completes or truncate happens before
1611 * writing starts. Must be called with the loi lock held. */
1612 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1615 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1618 /* This maintains the lists of pending pages to read/write for a given object
1619 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1620 * to quickly find objects that are ready to send an RPC. */
1621 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1627 if (lop->lop_num_pending == 0)
1630 /* if we have an invalid import we want to drain the queued pages
1631 * by forcing them through rpcs that immediately fail and complete
1632 * the pages. recovery relies on this to empty the queued pages
1633 * before canceling the locks and evicting down the llite pages */
1634 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1637 /* stream rpcs in queue order as long as as there is an urgent page
1638 * queued. this is our cheap solution for good batching in the case
1639 * where writepage marks some random page in the middle of the file
1640 * as urgent because of, say, memory pressure */
1641 if (!list_empty(&lop->lop_urgent)) {
1642 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1646 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1647 optimal = cli->cl_max_pages_per_rpc;
1648 if (cmd & OBD_BRW_WRITE) {
1649 /* trigger a write rpc stream as long as there are dirtiers
1650 * waiting for space. as they're waiting, they're not going to
1651 * create more pages to coallesce with what's waiting.. */
1652 if (!list_empty(&cli->cl_cache_waiters)) {
1653 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1657 /* +16 to avoid triggering rpcs that would want to include pages
1658 * that are being queued but which can't be made ready until
1659 * the queuer finishes with the page. this is a wart for
1660 * llite::commit_write() */
1663 if (lop->lop_num_pending >= optimal)
1669 static void on_list(struct list_head *item, struct list_head *list,
1672 if (list_empty(item) && should_be_on)
1673 list_add_tail(item, list);
1674 else if (!list_empty(item) && !should_be_on)
1675 list_del_init(item);
1678 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1679 * can find pages to build into rpcs quickly */
1680 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1682 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1683 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1684 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1686 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1687 loi->loi_write_lop.lop_num_pending);
1689 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1690 loi->loi_read_lop.lop_num_pending);
1693 static void lop_update_pending(struct client_obd *cli,
1694 struct loi_oap_pages *lop, int cmd, int delta)
1696 lop->lop_num_pending += delta;
1697 if (cmd & OBD_BRW_WRITE)
1698 cli->cl_pending_w_pages += delta;
1700 cli->cl_pending_r_pages += delta;
1703 /* this is called when a sync waiter receives an interruption. Its job is to
1704 * get the caller woken as soon as possible. If its page hasn't been put in an
1705 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1706 * desiring interruption which will forcefully complete the rpc once the rpc
1708 static void osc_occ_interrupted(struct oig_callback_context *occ)
1710 struct osc_async_page *oap;
1711 struct loi_oap_pages *lop;
1712 struct lov_oinfo *loi;
1715 /* XXX member_of() */
1716 oap = list_entry(occ, struct osc_async_page, oap_occ);
1718 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1720 oap->oap_interrupted = 1;
1722 /* ok, it's been put in an rpc. only one oap gets a request reference */
1723 if (oap->oap_request != NULL) {
1724 ptlrpc_mark_interrupted(oap->oap_request);
1725 ptlrpcd_wake(oap->oap_request);
1729 /* we don't get interruption callbacks until osc_trigger_group_io()
1730 * has been called and put the sync oaps in the pending/urgent lists.*/
1731 if (!list_empty(&oap->oap_pending_item)) {
1732 list_del_init(&oap->oap_pending_item);
1733 list_del_init(&oap->oap_urgent_item);
1736 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1737 &loi->loi_write_lop : &loi->loi_read_lop;
1738 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1739 loi_list_maint(oap->oap_cli, oap->oap_loi);
1741 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1742 oap->oap_oig = NULL;
1746 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1749 /* this is trying to propogate async writeback errors back up to the
1750 * application. As an async write fails we record the error code for later if
1751 * the app does an fsync. As long as errors persist we force future rpcs to be
1752 * sync so that the app can get a sync error and break the cycle of queueing
1753 * pages for which writeback will fail. */
1754 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1761 ar->ar_force_sync = 1;
1762 ar->ar_min_xid = ptlrpc_sample_next_xid();
1767 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1768 ar->ar_force_sync = 0;
1771 static void osc_oap_to_pending(struct osc_async_page *oap)
1773 struct loi_oap_pages *lop;
1775 if (oap->oap_cmd & OBD_BRW_WRITE)
1776 lop = &oap->oap_loi->loi_write_lop;
1778 lop = &oap->oap_loi->loi_read_lop;
1780 if (oap->oap_async_flags & ASYNC_URGENT)
1781 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1782 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1783 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1786 /* this must be called holding the loi list lock to give coverage to exit_cache,
1787 * async_flag maintenance, and oap_request */
1788 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1789 struct osc_async_page *oap, int sent, int rc)
1794 if (oap->oap_request != NULL) {
1795 xid = ptlrpc_req_xid(oap->oap_request);
1796 ptlrpc_req_finished(oap->oap_request);
1797 oap->oap_request = NULL;
1800 oap->oap_async_flags = 0;
1801 oap->oap_interrupted = 0;
1803 if (oap->oap_cmd & OBD_BRW_WRITE) {
1804 osc_process_ar(&cli->cl_ar, xid, rc);
1805 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1808 if (rc == 0 && oa != NULL) {
1809 if (oa->o_valid & OBD_MD_FLBLOCKS)
1810 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1811 if (oa->o_valid & OBD_MD_FLMTIME)
1812 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1813 if (oa->o_valid & OBD_MD_FLATIME)
1814 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1815 if (oa->o_valid & OBD_MD_FLCTIME)
1816 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1820 osc_exit_cache(cli, oap, sent);
1821 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1822 oap->oap_oig = NULL;
1827 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1828 oap->oap_cmd, oa, rc);
1830 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1831 * I/O on the page could start, but OSC calls it under lock
1832 * and thus we can add oap back to pending safely */
1834 /* upper layer wants to leave the page on pending queue */
1835 osc_oap_to_pending(oap);
1837 osc_exit_cache(cli, oap, sent);
1841 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1843 struct osc_brw_async_args *aa = data;
1844 struct osc_async_page *oap, *tmp;
1845 struct client_obd *cli;
1848 rc = osc_brw_fini_request(request, rc);
1849 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1851 if (osc_recoverable_error(rc)) {
1852 rc = osc_brw_redo_request(request, aa);
1859 client_obd_list_lock(&cli->cl_loi_list_lock);
1861 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1862 * is called so we know whether to go to sync BRWs or wait for more
1863 * RPCs to complete */
1864 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1865 cli->cl_w_in_flight--;
1867 cli->cl_r_in_flight--;
1869 /* the caller may re-use the oap after the completion call so
1870 * we need to clean it up a little */
1871 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1872 list_del_init(&oap->oap_rpc_item);
1873 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1876 osc_wake_cache_waiters(cli);
1877 osc_check_rpcs(cli);
1879 client_obd_list_unlock(&cli->cl_loi_list_lock);
1881 OBDO_FREE(aa->aa_oa);
1883 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1887 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1888 struct list_head *rpc_list,
1889 int page_count, int cmd)
1891 struct ptlrpc_request *req;
1892 struct brw_page **pga = NULL;
1893 struct osc_brw_async_args *aa;
1894 struct obdo *oa = NULL;
1895 struct obd_async_page_ops *ops = NULL;
1896 void *caller_data = NULL;
1897 struct osc_async_page *oap;
1901 LASSERT(!list_empty(rpc_list));
1903 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1905 RETURN(ERR_PTR(-ENOMEM));
1909 GOTO(out, req = ERR_PTR(-ENOMEM));
1912 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1914 ops = oap->oap_caller_ops;
1915 caller_data = oap->oap_caller_data;
1917 pga[i] = &oap->oap_brw_page;
1918 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1919 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1920 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1924 /* always get the data for the obdo for the rpc */
1925 LASSERT(ops != NULL);
1926 ops->ap_fill_obdo(caller_data, cmd, oa);
1928 sort_brw_pages(pga, page_count);
1929 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1931 CERROR("prep_req failed: %d\n", rc);
1932 GOTO(out, req = ERR_PTR(rc));
1935 /* Need to update the timestamps after the request is built in case
1936 * we race with setattr (locally or in queue at OST). If OST gets
1937 * later setattr before earlier BRW (as determined by the request xid),
1938 * the OST will not use BRW timestamps. Sadly, there is no obvious
1939 * way to do this in a single call. bug 10150 */
1940 ops->ap_update_obdo(caller_data, cmd, oa,
1941 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1943 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1944 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1945 INIT_LIST_HEAD(&aa->aa_oaps);
1946 list_splice(rpc_list, &aa->aa_oaps);
1947 INIT_LIST_HEAD(rpc_list);
1954 OBD_FREE(pga, sizeof(*pga) * page_count);
1959 /* the loi lock is held across this function but it's allowed to release
1960 * and reacquire it during its work */
1961 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1962 int cmd, struct loi_oap_pages *lop)
1964 struct ptlrpc_request *req;
1965 obd_count page_count = 0;
1966 struct osc_async_page *oap = NULL, *tmp;
1967 struct osc_brw_async_args *aa;
1968 struct obd_async_page_ops *ops;
1969 CFS_LIST_HEAD(rpc_list);
1970 unsigned int ending_offset;
1971 unsigned starting_offset = 0;
1974 /* first we find the pages we're allowed to work with */
1975 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1976 ops = oap->oap_caller_ops;
1978 LASSERT(oap->oap_magic == OAP_MAGIC);
1980 /* in llite being 'ready' equates to the page being locked
1981 * until completion unlocks it. commit_write submits a page
1982 * as not ready because its unlock will happen unconditionally
1983 * as the call returns. if we race with commit_write giving
1984 * us that page we dont' want to create a hole in the page
1985 * stream, so we stop and leave the rpc to be fired by
1986 * another dirtier or kupdated interval (the not ready page
1987 * will still be on the dirty list). we could call in
1988 * at the end of ll_file_write to process the queue again. */
1989 if (!(oap->oap_async_flags & ASYNC_READY)) {
1990 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1992 CDEBUG(D_INODE, "oap %p page %p returned %d "
1993 "instead of ready\n", oap,
1997 /* llite is telling us that the page is still
1998 * in commit_write and that we should try
1999 * and put it in an rpc again later. we
2000 * break out of the loop so we don't create
2001 * a hole in the sequence of pages in the rpc
2006 /* the io isn't needed.. tell the checks
2007 * below to complete the rpc with EINTR */
2008 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2009 oap->oap_count = -EINTR;
2012 oap->oap_async_flags |= ASYNC_READY;
2015 LASSERTF(0, "oap %p page %p returned %d "
2016 "from make_ready\n", oap,
2024 * Page submitted for IO has to be locked. Either by
2025 * ->ap_make_ready() or by higher layers.
2027 * XXX nikita: this assertion should be adjusted when lustre
2028 * starts using PG_writeback for pages being written out.
2030 #if defined(__KERNEL__) && defined(__LINUX__)
2031 LASSERT(PageLocked(oap->oap_page));
2033 /* If there is a gap at the start of this page, it can't merge
2034 * with any previous page, so we'll hand the network a
2035 * "fragmented" page array that it can't transfer in 1 RDMA */
2036 if (page_count != 0 && oap->oap_page_off != 0)
2039 /* take the page out of our book-keeping */
2040 list_del_init(&oap->oap_pending_item);
2041 lop_update_pending(cli, lop, cmd, -1);
2042 list_del_init(&oap->oap_urgent_item);
2044 if (page_count == 0)
2045 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2046 (PTLRPC_MAX_BRW_SIZE - 1);
2048 /* ask the caller for the size of the io as the rpc leaves. */
2049 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2051 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2052 if (oap->oap_count <= 0) {
2053 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2055 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2059 /* now put the page back in our accounting */
2060 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2061 if (++page_count >= cli->cl_max_pages_per_rpc)
2064 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2065 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2066 * have the same alignment as the initial writes that allocated
2067 * extents on the server. */
2068 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2069 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2070 if (ending_offset == 0)
2073 /* If there is a gap at the end of this page, it can't merge
2074 * with any subsequent pages, so we'll hand the network a
2075 * "fragmented" page array that it can't transfer in 1 RDMA */
2076 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2080 osc_wake_cache_waiters(cli);
2082 if (page_count == 0)
2085 loi_list_maint(cli, loi);
2087 client_obd_list_unlock(&cli->cl_loi_list_lock);
2089 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2091 /* this should happen rarely and is pretty bad, it makes the
2092 * pending list not follow the dirty order */
2093 client_obd_list_lock(&cli->cl_loi_list_lock);
2094 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2095 list_del_init(&oap->oap_rpc_item);
2097 /* queued sync pages can be torn down while the pages
2098 * were between the pending list and the rpc */
2099 if (oap->oap_interrupted) {
2100 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2101 osc_ap_completion(cli, NULL, oap, 0,
2105 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2107 loi_list_maint(cli, loi);
2108 RETURN(PTR_ERR(req));
2111 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2112 if (cmd == OBD_BRW_READ) {
2113 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2114 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2115 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2116 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2117 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2119 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2120 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2121 cli->cl_w_in_flight);
2122 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2123 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2124 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2127 client_obd_list_lock(&cli->cl_loi_list_lock);
2129 if (cmd == OBD_BRW_READ)
2130 cli->cl_r_in_flight++;
2132 cli->cl_w_in_flight++;
2134 /* queued sync pages can be torn down while the pages
2135 * were between the pending list and the rpc */
2137 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2138 /* only one oap gets a request reference */
2141 if (oap->oap_interrupted && !req->rq_intr) {
2142 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2144 ptlrpc_mark_interrupted(req);
2148 tmp->oap_request = ptlrpc_request_addref(req);
2150 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2151 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2153 req->rq_interpret_reply = brw_interpret_oap;
2154 ptlrpcd_add_req(req);
2158 #define LOI_DEBUG(LOI, STR, args...) \
2159 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2160 !list_empty(&(LOI)->loi_cli_item), \
2161 (LOI)->loi_write_lop.lop_num_pending, \
2162 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2163 (LOI)->loi_read_lop.lop_num_pending, \
2164 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2167 /* This is called by osc_check_rpcs() to find which objects have pages that
2168 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2169 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2172 /* first return all objects which we already know to have
2173 * pages ready to be stuffed into rpcs */
2174 if (!list_empty(&cli->cl_loi_ready_list))
2175 RETURN(list_entry(cli->cl_loi_ready_list.next,
2176 struct lov_oinfo, loi_cli_item));
2178 /* then if we have cache waiters, return all objects with queued
2179 * writes. This is especially important when many small files
2180 * have filled up the cache and not been fired into rpcs because
2181 * they don't pass the nr_pending/object threshhold */
2182 if (!list_empty(&cli->cl_cache_waiters) &&
2183 !list_empty(&cli->cl_loi_write_list))
2184 RETURN(list_entry(cli->cl_loi_write_list.next,
2185 struct lov_oinfo, loi_write_item));
2187 /* then return all queued objects when we have an invalid import
2188 * so that they get flushed */
2189 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2190 if (!list_empty(&cli->cl_loi_write_list))
2191 RETURN(list_entry(cli->cl_loi_write_list.next,
2192 struct lov_oinfo, loi_write_item));
2193 if (!list_empty(&cli->cl_loi_read_list))
2194 RETURN(list_entry(cli->cl_loi_read_list.next,
2195 struct lov_oinfo, loi_read_item));
2200 /* called with the loi list lock held */
2201 static void osc_check_rpcs(struct client_obd *cli)
2203 struct lov_oinfo *loi;
2204 int rc = 0, race_counter = 0;
2207 while ((loi = osc_next_loi(cli)) != NULL) {
2208 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2210 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2213 /* attempt some read/write balancing by alternating between
2214 * reads and writes in an object. The makes_rpc checks here
2215 * would be redundant if we were getting read/write work items
2216 * instead of objects. we don't want send_oap_rpc to drain a
2217 * partial read pending queue when we're given this object to
2218 * do io on writes while there are cache waiters */
2219 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2220 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2221 &loi->loi_write_lop);
2229 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2230 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2231 &loi->loi_read_lop);
2240 /* attempt some inter-object balancing by issueing rpcs
2241 * for each object in turn */
2242 if (!list_empty(&loi->loi_cli_item))
2243 list_del_init(&loi->loi_cli_item);
2244 if (!list_empty(&loi->loi_write_item))
2245 list_del_init(&loi->loi_write_item);
2246 if (!list_empty(&loi->loi_read_item))
2247 list_del_init(&loi->loi_read_item);
2249 loi_list_maint(cli, loi);
2251 /* send_oap_rpc fails with 0 when make_ready tells it to
2252 * back off. llite's make_ready does this when it tries
2253 * to lock a page queued for write that is already locked.
2254 * we want to try sending rpcs from many objects, but we
2255 * don't want to spin failing with 0. */
2256 if (race_counter == 10)
2262 /* we're trying to queue a page in the osc so we're subject to the
2263 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2264 * If the osc's queued pages are already at that limit, then we want to sleep
2265 * until there is space in the osc's queue for us. We also may be waiting for
2266 * write credits from the OST if there are RPCs in flight that may return some
2267 * before we fall back to sync writes.
2269 * We need this know our allocation was granted in the presence of signals */
2270 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2274 client_obd_list_lock(&cli->cl_loi_list_lock);
2275 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2276 client_obd_list_unlock(&cli->cl_loi_list_lock);
2280 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2281 * grant or cache space. */
2282 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2283 struct osc_async_page *oap)
2285 struct osc_cache_waiter ocw;
2286 struct l_wait_info lwi = { 0 };
2289 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2290 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2291 cli->cl_dirty_max, obd_max_dirty_pages,
2292 cli->cl_lost_grant, cli->cl_avail_grant);
2294 /* force the caller to try sync io. this can jump the list
2295 * of queued writes and create a discontiguous rpc stream */
2296 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2297 loi->loi_ar.ar_force_sync)
2300 /* Hopefully normal case - cache space and write credits available */
2301 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2302 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2303 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2304 /* account for ourselves */
2305 osc_consume_write_grant(cli, &oap->oap_brw_page);
2309 /* Make sure that there are write rpcs in flight to wait for. This
2310 * is a little silly as this object may not have any pending but
2311 * other objects sure might. */
2312 if (cli->cl_w_in_flight) {
2313 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2314 cfs_waitq_init(&ocw.ocw_waitq);
2318 loi_list_maint(cli, loi);
2319 osc_check_rpcs(cli);
2320 client_obd_list_unlock(&cli->cl_loi_list_lock);
2322 CDEBUG(D_CACHE, "sleeping for cache space\n");
2323 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2325 client_obd_list_lock(&cli->cl_loi_list_lock);
2326 if (!list_empty(&ocw.ocw_entry)) {
2327 list_del(&ocw.ocw_entry);
2336 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2337 struct lov_oinfo *loi, cfs_page_t *page,
2338 obd_off offset, struct obd_async_page_ops *ops,
2339 void *data, void **res)
2341 struct osc_async_page *oap;
2345 return size_round(sizeof(*oap));
2348 oap->oap_magic = OAP_MAGIC;
2349 oap->oap_cli = &exp->exp_obd->u.cli;
2352 oap->oap_caller_ops = ops;
2353 oap->oap_caller_data = data;
2355 oap->oap_page = page;
2356 oap->oap_obj_off = offset;
2358 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2359 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2360 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2362 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2364 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2368 struct osc_async_page *oap_from_cookie(void *cookie)
2370 struct osc_async_page *oap = cookie;
2371 if (oap->oap_magic != OAP_MAGIC)
2372 return ERR_PTR(-EINVAL);
2376 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2377 struct lov_oinfo *loi, void *cookie,
2378 int cmd, obd_off off, int count,
2379 obd_flag brw_flags, enum async_flags async_flags)
2381 struct client_obd *cli = &exp->exp_obd->u.cli;
2382 struct osc_async_page *oap;
2386 oap = oap_from_cookie(cookie);
2388 RETURN(PTR_ERR(oap));
2390 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2393 if (!list_empty(&oap->oap_pending_item) ||
2394 !list_empty(&oap->oap_urgent_item) ||
2395 !list_empty(&oap->oap_rpc_item))
2398 /* check if the file's owner/group is over quota */
2399 #ifdef HAVE_QUOTA_SUPPORT
2400 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2401 struct obd_async_page_ops *ops;
2408 ops = oap->oap_caller_ops;
2409 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2410 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2421 loi = lsm->lsm_oinfo[0];
2423 client_obd_list_lock(&cli->cl_loi_list_lock);
2426 oap->oap_page_off = off;
2427 oap->oap_count = count;
2428 oap->oap_brw_flags = brw_flags;
2429 oap->oap_async_flags = async_flags;
2431 if (cmd & OBD_BRW_WRITE) {
2432 rc = osc_enter_cache(cli, loi, oap);
2434 client_obd_list_unlock(&cli->cl_loi_list_lock);
2439 osc_oap_to_pending(oap);
2440 loi_list_maint(cli, loi);
2442 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2445 osc_check_rpcs(cli);
2446 client_obd_list_unlock(&cli->cl_loi_list_lock);
2451 /* aka (~was & now & flag), but this is more clear :) */
2452 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2454 static int osc_set_async_flags(struct obd_export *exp,
2455 struct lov_stripe_md *lsm,
2456 struct lov_oinfo *loi, void *cookie,
2457 obd_flag async_flags)
2459 struct client_obd *cli = &exp->exp_obd->u.cli;
2460 struct loi_oap_pages *lop;
2461 struct osc_async_page *oap;
2465 oap = oap_from_cookie(cookie);
2467 RETURN(PTR_ERR(oap));
2470 * bug 7311: OST-side locking is only supported for liblustre for now
2471 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2472 * implementation has to handle case where OST-locked page was picked
2473 * up by, e.g., ->writepage().
2475 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2476 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2479 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2483 loi = lsm->lsm_oinfo[0];
2485 if (oap->oap_cmd & OBD_BRW_WRITE) {
2486 lop = &loi->loi_write_lop;
2488 lop = &loi->loi_read_lop;
2491 client_obd_list_lock(&cli->cl_loi_list_lock);
2493 if (list_empty(&oap->oap_pending_item))
2494 GOTO(out, rc = -EINVAL);
2496 if ((oap->oap_async_flags & async_flags) == async_flags)
2499 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2500 oap->oap_async_flags |= ASYNC_READY;
2502 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2503 if (list_empty(&oap->oap_rpc_item)) {
2504 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2505 loi_list_maint(cli, loi);
2509 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2510 oap->oap_async_flags);
2512 osc_check_rpcs(cli);
2513 client_obd_list_unlock(&cli->cl_loi_list_lock);
2517 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2518 struct lov_oinfo *loi,
2519 struct obd_io_group *oig, void *cookie,
2520 int cmd, obd_off off, int count,
2522 obd_flag async_flags)
2524 struct client_obd *cli = &exp->exp_obd->u.cli;
2525 struct osc_async_page *oap;
2526 struct loi_oap_pages *lop;
2530 oap = oap_from_cookie(cookie);
2532 RETURN(PTR_ERR(oap));
2534 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2537 if (!list_empty(&oap->oap_pending_item) ||
2538 !list_empty(&oap->oap_urgent_item) ||
2539 !list_empty(&oap->oap_rpc_item))
2543 loi = lsm->lsm_oinfo[0];
2545 client_obd_list_lock(&cli->cl_loi_list_lock);
2548 oap->oap_page_off = off;
2549 oap->oap_count = count;
2550 oap->oap_brw_flags = brw_flags;
2551 oap->oap_async_flags = async_flags;
2553 if (cmd & OBD_BRW_WRITE)
2554 lop = &loi->loi_write_lop;
2556 lop = &loi->loi_read_lop;
2558 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2559 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2561 rc = oig_add_one(oig, &oap->oap_occ);
2564 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2565 oap, oap->oap_page, rc);
2567 client_obd_list_unlock(&cli->cl_loi_list_lock);
2572 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2573 struct loi_oap_pages *lop, int cmd)
2575 struct list_head *pos, *tmp;
2576 struct osc_async_page *oap;
2578 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2579 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2580 list_del(&oap->oap_pending_item);
2581 osc_oap_to_pending(oap);
2583 loi_list_maint(cli, loi);
2586 static int osc_trigger_group_io(struct obd_export *exp,
2587 struct lov_stripe_md *lsm,
2588 struct lov_oinfo *loi,
2589 struct obd_io_group *oig)
2591 struct client_obd *cli = &exp->exp_obd->u.cli;
2595 loi = lsm->lsm_oinfo[0];
2597 client_obd_list_lock(&cli->cl_loi_list_lock);
2599 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2600 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2602 osc_check_rpcs(cli);
2603 client_obd_list_unlock(&cli->cl_loi_list_lock);
2608 static int osc_teardown_async_page(struct obd_export *exp,
2609 struct lov_stripe_md *lsm,
2610 struct lov_oinfo *loi, void *cookie)
2612 struct client_obd *cli = &exp->exp_obd->u.cli;
2613 struct loi_oap_pages *lop;
2614 struct osc_async_page *oap;
2618 oap = oap_from_cookie(cookie);
2620 RETURN(PTR_ERR(oap));
2623 loi = lsm->lsm_oinfo[0];
2625 if (oap->oap_cmd & OBD_BRW_WRITE) {
2626 lop = &loi->loi_write_lop;
2628 lop = &loi->loi_read_lop;
2631 client_obd_list_lock(&cli->cl_loi_list_lock);
2633 if (!list_empty(&oap->oap_rpc_item))
2634 GOTO(out, rc = -EBUSY);
2636 osc_exit_cache(cli, oap, 0);
2637 osc_wake_cache_waiters(cli);
2639 if (!list_empty(&oap->oap_urgent_item)) {
2640 list_del_init(&oap->oap_urgent_item);
2641 oap->oap_async_flags &= ~ASYNC_URGENT;
2643 if (!list_empty(&oap->oap_pending_item)) {
2644 list_del_init(&oap->oap_pending_item);
2645 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2647 loi_list_maint(cli, loi);
2649 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2651 client_obd_list_unlock(&cli->cl_loi_list_lock);
2655 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2658 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2661 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2664 lock_res_and_lock(lock);
2667 /* Liang XXX: Darwin and Winnt checking should be added */
2668 if (lock->l_ast_data && lock->l_ast_data != data) {
2669 struct inode *new_inode = data;
2670 struct inode *old_inode = lock->l_ast_data;
2671 if (!(old_inode->i_state & I_FREEING))
2672 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2673 LASSERTF(old_inode->i_state & I_FREEING,
2674 "Found existing inode %p/%lu/%u state %lu in lock: "
2675 "setting data to %p/%lu/%u\n", old_inode,
2676 old_inode->i_ino, old_inode->i_generation,
2678 new_inode, new_inode->i_ino, new_inode->i_generation);
2682 lock->l_ast_data = data;
2683 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2684 unlock_res_and_lock(lock);
2685 LDLM_LOCK_PUT(lock);
2688 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2689 ldlm_iterator_t replace, void *data)
2691 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2692 struct obd_device *obd = class_exp2obd(exp);
2694 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2698 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2704 /* The request was created before ldlm_cli_enqueue call. */
2705 if (rc == ELDLM_LOCK_ABORTED) {
2706 struct ldlm_reply *rep;
2708 /* swabbed by ldlm_cli_enqueue() */
2709 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2710 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2712 LASSERT(rep != NULL);
2713 if (rep->lock_policy_res1)
2714 rc = rep->lock_policy_res1;
2718 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2719 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2720 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2721 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2722 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2725 /* Call the update callback. */
2726 rc = oinfo->oi_cb_up(oinfo, rc);
2730 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2731 struct osc_enqueue_args *aa, int rc)
2733 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2734 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2735 struct ldlm_lock *lock;
2737 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2739 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2741 /* Complete obtaining the lock procedure. */
2742 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2744 &aa->oa_oi->oi_flags,
2745 &lsm->lsm_oinfo[0]->loi_lvb,
2746 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2747 lustre_swab_ost_lvb,
2748 aa->oa_oi->oi_lockh, rc);
2750 /* Complete osc stuff. */
2751 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2753 /* Release the lock for async request. */
2754 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2755 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2757 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2758 aa->oa_oi->oi_lockh, req, aa);
2759 LDLM_LOCK_PUT(lock);
2763 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2764 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2765 * other synchronous requests, however keeping some locks and trying to obtain
2766 * others may take a considerable amount of time in a case of ost failure; and
2767 * when other sync requests do not get released lock from a client, the client
2768 * is excluded from the cluster -- such scenarious make the life difficult, so
2769 * release locks just after they are obtained. */
2770 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2771 struct ldlm_enqueue_info *einfo,
2772 struct ptlrpc_request_set *rqset)
2774 struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2775 struct obd_device *obd = exp->exp_obd;
2776 struct ldlm_reply *rep;
2777 struct ptlrpc_request *req = NULL;
2778 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2782 /* Filesystem lock extents are extended to page boundaries so that
2783 * dealing with the page cache is a little smoother. */
2784 oinfo->oi_policy.l_extent.start -=
2785 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2786 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2788 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2791 /* Next, search for already existing extent locks that will cover us */
2792 rc = ldlm_lock_match(obd->obd_namespace,
2793 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2794 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2797 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2800 /* I would like to be able to ASSERT here that rss <=
2801 * kms, but I can't, for reasons which are explained in
2805 /* We already have a lock, and it's referenced */
2806 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2808 /* For async requests, decref the lock. */
2810 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2815 /* If we're trying to read, we also search for an existing PW lock. The
2816 * VFS and page cache already protect us locally, so lots of readers/
2817 * writers can share a single PW lock.
2819 * There are problems with conversion deadlocks, so instead of
2820 * converting a read lock to a write lock, we'll just enqueue a new
2823 * At some point we should cancel the read lock instead of making them
2824 * send us a blocking callback, but there are problems with canceling
2825 * locks out from other users right now, too. */
2827 if (einfo->ei_mode == LCK_PR) {
2828 rc = ldlm_lock_match(obd->obd_namespace,
2829 oinfo->oi_flags | LDLM_FL_LVB_READY,
2830 &res_id, einfo->ei_type, &oinfo->oi_policy,
2831 LCK_PW, oinfo->oi_lockh);
2833 /* FIXME: This is not incredibly elegant, but it might
2834 * be more elegant than adding another parameter to
2835 * lock_match. I want a second opinion. */
2836 /* addref the lock only if not async requests. */
2838 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2839 osc_set_data_with_check(oinfo->oi_lockh,
2842 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2843 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2851 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2852 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2853 [DLM_LOCKREQ_OFF + 1] = 0 };
2855 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2859 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2860 size[DLM_REPLY_REC_OFF] =
2861 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2862 ptlrpc_req_set_repsize(req, 3, size);
2865 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2866 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2868 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
2869 &oinfo->oi_policy, &oinfo->oi_flags,
2870 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2871 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2872 lustre_swab_ost_lvb, oinfo->oi_lockh,
2876 struct osc_enqueue_args *aa;
2877 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2878 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2883 req->rq_interpret_reply = osc_enqueue_interpret;
2884 ptlrpc_set_add_req(rqset, req);
2885 } else if (intent) {
2886 ptlrpc_req_finished(req);
2891 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2893 ptlrpc_req_finished(req);
2898 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2899 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2900 int *flags, void *data, struct lustre_handle *lockh)
2902 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2903 struct obd_device *obd = exp->exp_obd;
2905 int lflags = *flags;
2908 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2910 /* Filesystem lock extents are extended to page boundaries so that
2911 * dealing with the page cache is a little smoother */
2912 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2913 policy->l_extent.end |= ~CFS_PAGE_MASK;
2915 /* Next, search for already existing extent locks that will cover us */
2916 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2917 policy, mode, lockh);
2919 //if (!(*flags & LDLM_FL_TEST_LOCK))
2920 osc_set_data_with_check(lockh, data, lflags);
2923 /* If we're trying to read, we also search for an existing PW lock. The
2924 * VFS and page cache already protect us locally, so lots of readers/
2925 * writers can share a single PW lock. */
2926 if (mode == LCK_PR) {
2927 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2929 policy, LCK_PW, lockh);
2930 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2931 /* FIXME: This is not incredibly elegant, but it might
2932 * be more elegant than adding another parameter to
2933 * lock_match. I want a second opinion. */
2934 osc_set_data_with_check(lockh, data, lflags);
2935 ldlm_lock_addref(lockh, LCK_PR);
2936 ldlm_lock_decref(lockh, LCK_PW);
2942 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2943 __u32 mode, struct lustre_handle *lockh)
2947 if (unlikely(mode == LCK_GROUP))
2948 ldlm_lock_decref_and_cancel(lockh, mode);
2950 ldlm_lock_decref(lockh, mode);
2955 static int osc_cancel_unused(struct obd_export *exp,
2956 struct lov_stripe_md *lsm, int flags, void *opaque)
2958 struct obd_device *obd = class_exp2obd(exp);
2959 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2961 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2965 static int osc_join_lru(struct obd_export *exp,
2966 struct lov_stripe_md *lsm, int join)
2968 struct obd_device *obd = class_exp2obd(exp);
2969 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2971 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2974 static int osc_statfs_interpret(struct ptlrpc_request *req,
2975 struct osc_async_args *aa, int rc)
2977 struct obd_statfs *msfs;
2983 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2984 lustre_swab_obd_statfs);
2986 CERROR("Can't unpack obd_statfs\n");
2987 GOTO(out, rc = -EPROTO);
2990 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2992 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2996 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2997 __u64 max_age, struct ptlrpc_request_set *rqset)
2999 struct ptlrpc_request *req;
3000 struct osc_async_args *aa;
3001 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3004 /* We could possibly pass max_age in the request (as an absolute
3005 * timestamp or a "seconds.usec ago") so the target can avoid doing
3006 * extra calls into the filesystem if that isn't necessary (e.g.
3007 * during mount that would help a bit). Having relative timestamps
3008 * is not so great if request processing is slow, while absolute
3009 * timestamps are not ideal because they need time synchronization. */
3010 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3011 OST_STATFS, 1, NULL, NULL);
3015 ptlrpc_req_set_repsize(req, 2, size);
3016 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3018 req->rq_interpret_reply = osc_statfs_interpret;
3019 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3020 aa = (struct osc_async_args *)&req->rq_async_args;
3023 ptlrpc_set_add_req(rqset, req);
3027 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3030 struct obd_statfs *msfs;
3031 struct ptlrpc_request *req;
3032 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3035 /* We could possibly pass max_age in the request (as an absolute
3036 * timestamp or a "seconds.usec ago") so the target can avoid doing
3037 * extra calls into the filesystem if that isn't necessary (e.g.
3038 * during mount that would help a bit). Having relative timestamps
3039 * is not so great if request processing is slow, while absolute
3040 * timestamps are not ideal because they need time synchronization. */
3041 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3042 OST_STATFS, 1, NULL, NULL);
3046 ptlrpc_req_set_repsize(req, 2, size);
3047 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3049 rc = ptlrpc_queue_wait(req);
3053 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3054 lustre_swab_obd_statfs);
3056 CERROR("Can't unpack obd_statfs\n");
3057 GOTO(out, rc = -EPROTO);
3060 memcpy(osfs, msfs, sizeof(*osfs));
3064 ptlrpc_req_finished(req);
3068 /* Retrieve object striping information.
3070 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3071 * the maximum number of OST indices which will fit in the user buffer.
3072 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3074 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3076 struct lov_user_md lum, *lumk;
3077 int rc = 0, lum_size;
3083 if (copy_from_user(&lum, lump, sizeof(lum)))
3086 if (lum.lmm_magic != LOV_USER_MAGIC)
3089 if (lum.lmm_stripe_count > 0) {
3090 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3091 OBD_ALLOC(lumk, lum_size);
3095 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3097 lum_size = sizeof(lum);
3101 lumk->lmm_object_id = lsm->lsm_object_id;
3102 lumk->lmm_stripe_count = 1;
3104 if (copy_to_user(lump, lumk, lum_size))
3108 OBD_FREE(lumk, lum_size);
3114 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3115 void *karg, void *uarg)
3117 struct obd_device *obd = exp->exp_obd;
3118 struct obd_ioctl_data *data = karg;
3122 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3125 if (!try_module_get(THIS_MODULE)) {
3126 CERROR("Can't get module. Is it alive?");
3131 case OBD_IOC_LOV_GET_CONFIG: {
3133 struct lov_desc *desc;
3134 struct obd_uuid uuid;
3138 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3139 GOTO(out, err = -EINVAL);
3141 data = (struct obd_ioctl_data *)buf;
3143 if (sizeof(*desc) > data->ioc_inllen1) {
3144 obd_ioctl_freedata(buf, len);
3145 GOTO(out, err = -EINVAL);
3148 if (data->ioc_inllen2 < sizeof(uuid)) {
3149 obd_ioctl_freedata(buf, len);
3150 GOTO(out, err = -EINVAL);
3153 desc = (struct lov_desc *)data->ioc_inlbuf1;
3154 desc->ld_tgt_count = 1;
3155 desc->ld_active_tgt_count = 1;
3156 desc->ld_default_stripe_count = 1;
3157 desc->ld_default_stripe_size = 0;
3158 desc->ld_default_stripe_offset = 0;
3159 desc->ld_pattern = 0;
3160 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3162 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3164 err = copy_to_user((void *)uarg, buf, len);
3167 obd_ioctl_freedata(buf, len);
3170 case LL_IOC_LOV_SETSTRIPE:
3171 err = obd_alloc_memmd(exp, karg);
3175 case LL_IOC_LOV_GETSTRIPE:
3176 err = osc_getstripe(karg, uarg);
3178 case OBD_IOC_CLIENT_RECOVER:
3179 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3184 case IOC_OSC_SET_ACTIVE:
3185 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3188 case OBD_IOC_POLL_QUOTACHECK:
3189 err = lquota_poll_check(quota_interface, exp,
3190 (struct if_quotacheck *)karg);
3193 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3194 cmd, cfs_curproc_comm());
3195 GOTO(out, err = -ENOTTY);
3198 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3201 module_put(THIS_MODULE);
3206 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3207 void *key, __u32 *vallen, void *val)
3210 if (!vallen || !val)
3213 if (keylen > strlen("lock_to_stripe") &&
3214 strcmp(key, "lock_to_stripe") == 0) {
3215 __u32 *stripe = val;
3216 *vallen = sizeof(*stripe);
3219 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3220 struct ptlrpc_request *req;
3222 char *bufs[2] = { NULL, key };
3223 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3225 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3226 OST_GET_INFO, 2, size, bufs);
3230 size[REPLY_REC_OFF] = *vallen;
3231 ptlrpc_req_set_repsize(req, 2, size);
3232 rc = ptlrpc_queue_wait(req);
3236 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3237 lustre_swab_ost_last_id);
3238 if (reply == NULL) {
3239 CERROR("Can't unpack OST last ID\n");
3240 GOTO(out, rc = -EPROTO);
3242 *((obd_id *)val) = *reply;
3244 ptlrpc_req_finished(req);
3250 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3253 struct llog_ctxt *ctxt;
3254 struct obd_import *imp = req->rq_import;
3260 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3263 rc = llog_initiator_connect(ctxt);
3265 CERROR("cannot establish connection for "
3266 "ctxt %p: %d\n", ctxt, rc);
3269 spin_lock(&imp->imp_lock);
3270 imp->imp_server_timeout = 1;
3271 imp->imp_pingable = 1;
3272 spin_unlock(&imp->imp_lock);
3273 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3278 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3279 void *key, obd_count vallen, void *val,
3280 struct ptlrpc_request_set *set)
3282 struct ptlrpc_request *req;
3283 struct obd_device *obd = exp->exp_obd;
3284 struct obd_import *imp = class_exp2cliimp(exp);
3285 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3286 char *bufs[3] = { NULL, key, val };
3289 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3291 if (KEY_IS(KEY_NEXT_ID)) {
3292 if (vallen != sizeof(obd_id))
3294 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3295 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3296 exp->exp_obd->obd_name,
3297 obd->u.cli.cl_oscc.oscc_next_id);
3302 if (KEY_IS("unlinked")) {
3303 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3304 spin_lock(&oscc->oscc_lock);
3305 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3306 spin_unlock(&oscc->oscc_lock);
3310 if (KEY_IS(KEY_INIT_RECOV)) {
3311 if (vallen != sizeof(int))
3313 spin_lock(&imp->imp_lock);
3314 imp->imp_initial_recov = *(int *)val;
3315 spin_unlock(&imp->imp_lock);
3316 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3317 exp->exp_obd->obd_name,
3318 imp->imp_initial_recov);
3322 if (KEY_IS("checksum")) {
3323 if (vallen != sizeof(int))
3325 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3332 /* We pass all other commands directly to OST. Since nobody calls osc
3333 methods directly and everybody is supposed to go through LOV, we
3334 assume lov checked invalid values for us.
3335 The only recognised values so far are evict_by_nid and mds_conn.
3336 Even if something bad goes through, we'd get a -EINVAL from OST
3339 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3344 if (KEY_IS(KEY_MDS_CONN))
3345 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3347 ptlrpc_req_set_repsize(req, 1, NULL);
3348 ptlrpc_set_add_req(set, req);
3349 ptlrpc_check_set(set);
3355 static struct llog_operations osc_size_repl_logops = {
3356 lop_cancel: llog_obd_repl_cancel
3359 static struct llog_operations osc_mds_ost_orig_logops;
3360 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3361 int count, struct llog_catid *catid,
3362 struct obd_uuid *uuid)
3367 spin_lock(&obd->obd_dev_lock);
3368 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3369 osc_mds_ost_orig_logops = llog_lvfs_ops;
3370 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3371 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3372 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3373 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3375 spin_unlock(&obd->obd_dev_lock);
3377 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3378 &catid->lci_logid, &osc_mds_ost_orig_logops);
3380 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3384 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3385 &osc_size_repl_logops);
3387 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3390 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3391 obd->obd_name, tgt->obd_name, count, catid, rc);
3392 CERROR("logid "LPX64":0x%x\n",
3393 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3398 static int osc_llog_finish(struct obd_device *obd, int count)
3400 struct llog_ctxt *ctxt;
3401 int rc = 0, rc2 = 0;
3404 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3406 rc = llog_cleanup(ctxt);
3408 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3410 rc2 = llog_cleanup(ctxt);
3417 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3418 struct obd_uuid *cluuid,
3419 struct obd_connect_data *data)
3421 struct client_obd *cli = &obd->u.cli;
3423 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3426 client_obd_list_lock(&cli->cl_loi_list_lock);
3427 data->ocd_grant = cli->cl_avail_grant ?:
3428 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3429 lost_grant = cli->cl_lost_grant;
3430 cli->cl_lost_grant = 0;
3431 client_obd_list_unlock(&cli->cl_loi_list_lock);
3433 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3434 "cl_lost_grant: %ld\n", data->ocd_grant,
3435 cli->cl_avail_grant, lost_grant);
3436 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3437 " ocd_grant: %d\n", data->ocd_connect_flags,
3438 data->ocd_version, data->ocd_grant);
3444 static int osc_disconnect(struct obd_export *exp)
3446 struct obd_device *obd = class_exp2obd(exp);
3447 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3450 if (obd->u.cli.cl_conn_count == 1)
3451 /* flush any remaining cancel messages out to the target */
3452 llog_sync(ctxt, exp);
3454 rc = client_disconnect_export(exp);
3458 static int osc_import_event(struct obd_device *obd,
3459 struct obd_import *imp,
3460 enum obd_import_event event)
3462 struct client_obd *cli;
3466 LASSERT(imp->imp_obd == obd);
3469 case IMP_EVENT_DISCON: {
3470 /* Only do this on the MDS OSC's */
3471 if (imp->imp_server_timeout) {
3472 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3474 spin_lock(&oscc->oscc_lock);
3475 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3476 spin_unlock(&oscc->oscc_lock);
3479 client_obd_list_lock(&cli->cl_loi_list_lock);
3480 cli->cl_avail_grant = 0;
3481 cli->cl_lost_grant = 0;
3482 client_obd_list_unlock(&cli->cl_loi_list_lock);
3486 case IMP_EVENT_INACTIVE: {
3487 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3490 case IMP_EVENT_INVALIDATE: {
3491 struct ldlm_namespace *ns = obd->obd_namespace;
3495 client_obd_list_lock(&cli->cl_loi_list_lock);
3496 /* all pages go to failing rpcs due to the invalid import */
3497 osc_check_rpcs(cli);
3498 client_obd_list_unlock(&cli->cl_loi_list_lock);
3500 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3504 case IMP_EVENT_ACTIVE: {
3505 /* Only do this on the MDS OSC's */
3506 if (imp->imp_server_timeout) {
3507 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3509 spin_lock(&oscc->oscc_lock);
3510 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3511 spin_unlock(&oscc->oscc_lock);
3513 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3516 case IMP_EVENT_OCD: {
3517 struct obd_connect_data *ocd = &imp->imp_connect_data;
3519 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3520 osc_init_grant(&obd->u.cli, ocd);
3523 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3524 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3526 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3530 CERROR("Unknown import event %d\n", event);
3536 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3542 rc = ptlrpcd_addref();
3546 rc = client_obd_setup(obd, len, buf);
3550 struct lprocfs_static_vars lvars;
3551 struct client_obd *cli = &obd->u.cli;
3553 lprocfs_init_vars(osc, &lvars);
3554 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3555 lproc_osc_attach_seqstat(obd);
3556 ptlrpc_lprocfs_register_obd(obd);
3560 /* We need to allocate a few requests more, because
3561 brw_interpret_oap tries to create new requests before freeing
3562 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3563 reserved, but I afraid that might be too much wasted RAM
3564 in fact, so 2 is just my guess and still should work. */
3565 cli->cl_import->imp_rq_pool =
3566 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3568 ptlrpc_add_rqs_to_pool);
3574 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3580 case OBD_CLEANUP_EARLY: {
3581 struct obd_import *imp;
3582 imp = obd->u.cli.cl_import;
3583 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3584 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3585 ptlrpc_deactivate_import(imp);
3588 case OBD_CLEANUP_EXPORTS: {
3589 /* If we set up but never connected, the
3590 client import will not have been cleaned. */
3591 if (obd->u.cli.cl_import) {
3592 struct obd_import *imp;
3593 imp = obd->u.cli.cl_import;
3594 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3596 ptlrpc_invalidate_import(imp);
3597 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3598 class_destroy_import(imp);
3599 obd->u.cli.cl_import = NULL;
3603 case OBD_CLEANUP_SELF_EXP:
3604 rc = obd_llog_finish(obd, 0);
3606 CERROR("failed to cleanup llogging subsystems\n");
3608 case OBD_CLEANUP_OBD:
3614 int osc_cleanup(struct obd_device *obd)
3616 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3620 ptlrpc_lprocfs_unregister_obd(obd);
3621 lprocfs_obd_cleanup(obd);
3623 spin_lock(&oscc->oscc_lock);
3624 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3625 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3626 spin_unlock(&oscc->oscc_lock);
3628 /* free memory of osc quota cache */
3629 lquota_cleanup(quota_interface, obd);
3631 rc = client_obd_cleanup(obd);
3637 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3639 struct lustre_cfg *lcfg = buf;
3640 struct lprocfs_static_vars lvars;
3643 lprocfs_init_vars(osc, &lvars);
3645 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3649 struct obd_ops osc_obd_ops = {
3650 .o_owner = THIS_MODULE,
3651 .o_setup = osc_setup,
3652 .o_precleanup = osc_precleanup,
3653 .o_cleanup = osc_cleanup,
3654 .o_add_conn = client_import_add_conn,
3655 .o_del_conn = client_import_del_conn,
3656 .o_connect = client_connect_import,
3657 .o_reconnect = osc_reconnect,
3658 .o_disconnect = osc_disconnect,
3659 .o_statfs = osc_statfs,
3660 .o_statfs_async = osc_statfs_async,
3661 .o_packmd = osc_packmd,
3662 .o_unpackmd = osc_unpackmd,
3663 .o_create = osc_create,
3664 .o_destroy = osc_destroy,
3665 .o_getattr = osc_getattr,
3666 .o_getattr_async = osc_getattr_async,
3667 .o_setattr = osc_setattr,
3668 .o_setattr_async = osc_setattr_async,
3670 .o_brw_async = osc_brw_async,
3671 .o_prep_async_page = osc_prep_async_page,
3672 .o_queue_async_io = osc_queue_async_io,
3673 .o_set_async_flags = osc_set_async_flags,
3674 .o_queue_group_io = osc_queue_group_io,
3675 .o_trigger_group_io = osc_trigger_group_io,
3676 .o_teardown_async_page = osc_teardown_async_page,
3677 .o_punch = osc_punch,
3679 .o_enqueue = osc_enqueue,
3680 .o_match = osc_match,
3681 .o_change_cbdata = osc_change_cbdata,
3682 .o_cancel = osc_cancel,
3683 .o_cancel_unused = osc_cancel_unused,
3684 .o_join_lru = osc_join_lru,
3685 .o_iocontrol = osc_iocontrol,
3686 .o_get_info = osc_get_info,
3687 .o_set_info_async = osc_set_info_async,
3688 .o_import_event = osc_import_event,
3689 .o_llog_init = osc_llog_init,
3690 .o_llog_finish = osc_llog_finish,
3691 .o_process_config = osc_process_config,
3693 int __init osc_init(void)
3695 struct lprocfs_static_vars lvars;
3699 atomic_set(&osc_resend_time, cfs_time_seconds(OSC_DEFAULT_TIMEOUT));
3700 lprocfs_init_vars(osc, &lvars);
3702 request_module("lquota");
3703 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3704 lquota_init(quota_interface);
3705 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3707 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3710 if (quota_interface)
3711 PORTAL_SYMBOL_PUT(osc_quota_interface);
3719 static void /*__exit*/ osc_exit(void)
3721 lquota_exit(quota_interface);
3722 if (quota_interface)
3723 PORTAL_SYMBOL_PUT(osc_quota_interface);
3725 class_unregister_type(LUSTRE_OSC_NAME);
3728 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3729 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3730 MODULE_LICENSE("GPL");
3732 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);