1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
70 atomic_t osc_resend_time;
72 /* Pack OSC object metadata for disk storage (LE byte order). */
73 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
74 struct lov_stripe_md *lsm)
79 lmm_size = sizeof(**lmmp);
84 OBD_FREE(*lmmp, lmm_size);
90 OBD_ALLOC(*lmmp, lmm_size);
96 LASSERT(lsm->lsm_object_id);
97 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105 struct lov_mds_md *lmm, int lmm_bytes)
111 if (lmm_bytes < sizeof (*lmm)) {
112 CERROR("lov_mds_md too small: %d, need %d\n",
113 lmm_bytes, (int)sizeof(*lmm));
116 /* XXX LOV_MAGIC etc check? */
118 if (lmm->lmm_object_id == 0) {
119 CERROR("lov_mds_md: zero lmm_object_id\n");
124 lsm_size = lov_stripe_md_size(1);
128 if (*lsmp != NULL && lmm == NULL) {
129 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130 OBD_FREE(*lsmp, lsm_size);
136 OBD_ALLOC(*lsmp, lsm_size);
139 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141 OBD_FREE(*lsmp, lsm_size);
144 loi_init((*lsmp)->lsm_oinfo[0]);
148 /* XXX zero *lsmp? */
149 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150 LASSERT((*lsmp)->lsm_object_id);
153 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
158 static int osc_getattr_interpret(struct ptlrpc_request *req,
159 struct osc_async_args *aa, int rc)
161 struct ost_body *body;
167 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
168 lustre_swab_ost_body);
170 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
171 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
173 /* This should really be sent by the OST */
174 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
175 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
177 CERROR("can't unpack ost_body\n");
179 aa->aa_oi->oi_oa->o_valid = 0;
182 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
186 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
187 struct ptlrpc_request_set *set)
189 struct ptlrpc_request *req;
190 struct ost_body *body;
191 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
192 struct osc_async_args *aa;
195 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
196 OST_GETATTR, 2, size,NULL);
200 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
201 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
203 ptlrpc_req_set_repsize(req, 2, size);
204 req->rq_interpret_reply = osc_getattr_interpret;
206 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
207 aa = (struct osc_async_args *)&req->rq_async_args;
210 ptlrpc_set_add_req(set, req);
214 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
216 struct ptlrpc_request *req;
217 struct ost_body *body;
218 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
221 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
222 OST_GETATTR, 2, size, NULL);
226 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
227 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
229 ptlrpc_req_set_repsize(req, 2, size);
231 rc = ptlrpc_queue_wait(req);
233 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
237 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
238 lustre_swab_ost_body);
240 CERROR ("can't unpack ost_body\n");
241 GOTO (out, rc = -EPROTO);
244 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
247 /* This should really be sent by the OST */
248 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
249 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
253 ptlrpc_req_finished(req);
257 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
258 struct obd_trans_info *oti)
260 struct ptlrpc_request *req;
261 struct ost_body *body;
262 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
265 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
266 OST_SETATTR, 2, size, NULL);
270 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
271 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
273 ptlrpc_req_set_repsize(req, 2, size);
275 rc = ptlrpc_queue_wait(req);
279 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
280 lustre_swab_ost_body);
282 GOTO(out, rc = -EPROTO);
284 memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
288 ptlrpc_req_finished(req);
292 static int osc_setattr_interpret(struct ptlrpc_request *req,
293 struct osc_async_args *aa, int rc)
295 struct ost_body *body;
301 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
302 lustre_swab_ost_body);
304 CERROR("can't unpack ost_body\n");
305 GOTO(out, rc = -EPROTO);
308 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
310 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
314 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
315 struct obd_trans_info *oti,
316 struct ptlrpc_request_set *rqset)
318 struct ptlrpc_request *req;
319 struct ost_body *body;
320 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
321 struct osc_async_args *aa;
324 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
325 OST_SETATTR, 2, size, NULL);
329 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
331 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
333 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
334 sizeof(*oti->oti_logcookies));
337 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
338 ptlrpc_req_set_repsize(req, 2, size);
339 /* do mds to ost setattr asynchronouly */
341 /* Do not wait for response. */
342 ptlrpcd_add_req(req);
344 req->rq_interpret_reply = osc_setattr_interpret;
346 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
347 aa = (struct osc_async_args *)&req->rq_async_args;
350 ptlrpc_set_add_req(rqset, req);
356 int osc_real_create(struct obd_export *exp, struct obdo *oa,
357 struct lov_stripe_md **ea, struct obd_trans_info *oti)
359 struct ptlrpc_request *req;
360 struct ost_body *body;
361 struct lov_stripe_md *lsm;
362 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
370 rc = obd_alloc_memmd(exp, &lsm);
375 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
376 OST_CREATE, 2, size, NULL);
378 GOTO(out, rc = -ENOMEM);
380 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
381 memcpy(&body->oa, oa, sizeof(body->oa));
383 ptlrpc_req_set_repsize(req, 2, size);
384 if (oa->o_valid & OBD_MD_FLINLINE) {
385 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
386 oa->o_flags == OBD_FL_DELORPHAN);
388 "delorphan from OST integration");
389 /* Don't resend the delorphan req */
390 req->rq_no_resend = req->rq_no_delay = 1;
393 rc = ptlrpc_queue_wait(req);
397 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
398 lustre_swab_ost_body);
400 CERROR ("can't unpack ost_body\n");
401 GOTO (out_req, rc = -EPROTO);
404 memcpy(oa, &body->oa, sizeof(*oa));
406 /* This should really be sent by the OST */
407 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
408 oa->o_valid |= OBD_MD_FLBLKSZ;
410 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
411 * have valid lsm_oinfo data structs, so don't go touching that.
412 * This needs to be fixed in a big way.
414 lsm->lsm_object_id = oa->o_id;
418 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
420 if (oa->o_valid & OBD_MD_FLCOOKIE) {
421 if (!oti->oti_logcookies)
422 oti_alloc_cookies(oti, 1);
423 memcpy(oti->oti_logcookies, obdo_logcookie(oa),
424 sizeof(oti->oti_onecookie));
428 CDEBUG(D_HA, "transno: "LPD64"\n",
429 lustre_msg_get_transno(req->rq_repmsg));
431 ptlrpc_req_finished(req);
434 obd_free_memmd(exp, &lsm);
438 static int osc_punch_interpret(struct ptlrpc_request *req,
439 struct osc_async_args *aa, int rc)
441 struct ost_body *body;
447 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
448 lustre_swab_ost_body);
450 CERROR ("can't unpack ost_body\n");
451 GOTO(out, rc = -EPROTO);
454 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
456 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
460 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
461 struct obd_trans_info *oti,
462 struct ptlrpc_request_set *rqset)
464 struct ptlrpc_request *req;
465 struct osc_async_args *aa;
466 struct ost_body *body;
467 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
475 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
476 OST_PUNCH, 2, size, NULL);
480 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
482 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
483 memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
485 /* overload the size and blocks fields in the oa with start/end */
486 body->oa.o_size = oinfo->oi_policy.l_extent.start;
487 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
488 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
490 ptlrpc_req_set_repsize(req, 2, size);
492 req->rq_interpret_reply = osc_punch_interpret;
493 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
494 aa = (struct osc_async_args *)&req->rq_async_args;
496 ptlrpc_set_add_req(rqset, req);
501 static int osc_sync(struct obd_export *exp, struct obdo *oa,
502 struct lov_stripe_md *md, obd_size start, obd_size end)
504 struct ptlrpc_request *req;
505 struct ost_body *body;
506 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
514 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
515 OST_SYNC, 2, size, NULL);
519 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
520 memcpy(&body->oa, oa, sizeof(*oa));
522 /* overload the size and blocks fields in the oa with start/end */
523 body->oa.o_size = start;
524 body->oa.o_blocks = end;
525 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
527 ptlrpc_req_set_repsize(req, 2, size);
529 rc = ptlrpc_queue_wait(req);
533 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
534 lustre_swab_ost_body);
536 CERROR ("can't unpack ost_body\n");
537 GOTO (out, rc = -EPROTO);
540 memcpy(oa, &body->oa, sizeof(*oa));
544 ptlrpc_req_finished(req);
548 /* Find and cancel locally locks matched by @mode in the resource found by
549 * @objid. Found locks are added into @cancel list. Returns the amount of
550 * locks added to @cancels list. */
551 static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
552 struct list_head *cancels, ldlm_mode_t mode,
555 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
556 struct ldlm_res_id res_id = { .name = { objid } };
557 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
564 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565 lock_flags, 0, NULL);
566 ldlm_resource_putref(res);
570 /* Destroy requests can be async always on the client, and we don't even really
571 * care about the return code since the client cannot do anything at all about
573 * When the MDS is unlinking a filename, it saves the file objects into a
574 * recovery llog, and these object records are cancelled when the OST reports
575 * they were destroyed and sync'd to disk (i.e. transaction committed).
576 * If the client dies, or the OST is down when the object should be destroyed,
577 * the records are not cancelled, and when the OST reconnects to the MDS next,
578 * it will retrieve the llog unlink logs and then sends the log cancellation
579 * cookies to the MDS after committing destroy transactions. */
580 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
581 struct lov_stripe_md *ea, struct obd_trans_info *oti,
582 struct obd_export *md_export)
584 CFS_LIST_HEAD(cancels);
585 struct ptlrpc_request *req;
586 struct ost_body *body;
587 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
588 int count, bufcount = 2;
596 count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
597 LDLM_FL_DISCARD_DATA);
598 if (exp_connect_cancelset(exp) && count) {
600 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
602 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
603 OST_DESTROY, bufcount, size, NULL);
604 if (exp_connect_cancelset(exp) && req)
605 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1);
607 ldlm_lock_list_put(&cancels, l_bl_ast, count);
612 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
614 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
616 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
617 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
618 sizeof(*oti->oti_logcookies));
621 memcpy(&body->oa, oa, sizeof(*oa));
622 ptlrpc_req_set_repsize(req, 2, size);
624 ptlrpcd_add_req(req);
628 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
631 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
633 LASSERT(!(oa->o_valid & bits));
636 client_obd_list_lock(&cli->cl_loi_list_lock);
637 oa->o_dirty = cli->cl_dirty;
638 if (cli->cl_dirty > cli->cl_dirty_max) {
639 CERROR("dirty %lu > dirty_max %lu\n",
640 cli->cl_dirty, cli->cl_dirty_max);
642 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
643 CERROR("dirty %d > system dirty_max %d\n",
644 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
646 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
647 CERROR("dirty %lu - dirty_max %lu too big???\n",
648 cli->cl_dirty, cli->cl_dirty_max);
651 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
652 (cli->cl_max_rpcs_in_flight + 1);
653 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
655 oa->o_grant = cli->cl_avail_grant;
656 oa->o_dropped = cli->cl_lost_grant;
657 cli->cl_lost_grant = 0;
658 client_obd_list_unlock(&cli->cl_loi_list_lock);
659 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
660 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
663 /* caller must hold loi_list_lock */
664 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
666 atomic_inc(&obd_dirty_pages);
667 cli->cl_dirty += CFS_PAGE_SIZE;
668 cli->cl_avail_grant -= CFS_PAGE_SIZE;
669 pga->flag |= OBD_BRW_FROM_GRANT;
670 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
671 CFS_PAGE_SIZE, pga, pga->pg);
672 LASSERT(cli->cl_avail_grant >= 0);
675 /* the companion to osc_consume_write_grant, called when a brw has completed.
676 * must be called with the loi lock held. */
677 static void osc_release_write_grant(struct client_obd *cli,
678 struct brw_page *pga, int sent)
680 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
683 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
688 pga->flag &= ~OBD_BRW_FROM_GRANT;
689 atomic_dec(&obd_dirty_pages);
690 cli->cl_dirty -= CFS_PAGE_SIZE;
692 cli->cl_lost_grant += CFS_PAGE_SIZE;
693 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
694 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
695 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
696 /* For short writes we shouldn't count parts of pages that
697 * span a whole block on the OST side, or our accounting goes
698 * wrong. Should match the code in filter_grant_check. */
699 int offset = pga->off & ~CFS_PAGE_MASK;
700 int count = pga->count + (offset & (blocksize - 1));
701 int end = (offset + pga->count) & (blocksize - 1);
703 count += blocksize - end;
705 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
706 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
707 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
708 cli->cl_avail_grant, cli->cl_dirty);
714 static unsigned long rpcs_in_flight(struct client_obd *cli)
716 return cli->cl_r_in_flight + cli->cl_w_in_flight;
719 /* caller must hold loi_list_lock */
720 void osc_wake_cache_waiters(struct client_obd *cli)
722 struct list_head *l, *tmp;
723 struct osc_cache_waiter *ocw;
726 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
727 /* if we can't dirty more, we must wait until some is written */
728 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
729 ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
730 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
731 "osc max %ld, sys max %d\n", cli->cl_dirty,
732 cli->cl_dirty_max, obd_max_dirty_pages);
736 /* if still dirty cache but no grant wait for pending RPCs that
737 * may yet return us some grant before doing sync writes */
738 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
739 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
740 cli->cl_w_in_flight);
744 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
745 list_del_init(&ocw->ocw_entry);
746 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
747 /* no more RPCs in flight to return grant, do sync IO */
748 ocw->ocw_rc = -EDQUOT;
749 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
751 osc_consume_write_grant(cli,
752 &ocw->ocw_oap->oap_brw_page);
755 cfs_waitq_signal(&ocw->ocw_waitq);
761 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
763 client_obd_list_lock(&cli->cl_loi_list_lock);
764 cli->cl_avail_grant = ocd->ocd_grant;
765 client_obd_list_unlock(&cli->cl_loi_list_lock);
767 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
768 cli->cl_avail_grant, cli->cl_lost_grant);
769 LASSERT(cli->cl_avail_grant >= 0);
772 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 client_obd_list_lock(&cli->cl_loi_list_lock);
775 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
776 if (body->oa.o_valid & OBD_MD_FLGRANT)
777 cli->cl_avail_grant += body->oa.o_grant;
778 /* waiters are woken in brw_interpret_oap */
779 client_obd_list_unlock(&cli->cl_loi_list_lock);
782 /* We assume that the reason this OSC got a short read is because it read
783 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
784 * via the LOV, and it _knows_ it's reading inside the file, it's just that
785 * this stripe never got written at or beyond this stripe offset yet. */
786 static void handle_short_read(int nob_read, obd_count page_count,
787 struct brw_page **pga)
792 /* skip bytes read OK */
793 while (nob_read > 0) {
794 LASSERT (page_count > 0);
796 if (pga[i]->count > nob_read) {
797 /* EOF inside this page */
798 ptr = cfs_kmap(pga[i]->pg) +
799 (pga[i]->off & ~CFS_PAGE_MASK);
800 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
801 cfs_kunmap(pga[i]->pg);
807 nob_read -= pga[i]->count;
812 /* zero remaining pages */
813 while (page_count-- > 0) {
814 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
815 memset(ptr, 0, pga[i]->count);
816 cfs_kunmap(pga[i]->pg);
821 static int check_write_rcs(struct ptlrpc_request *req,
822 int requested_nob, int niocount,
823 obd_count page_count, struct brw_page **pga)
827 /* return error if any niobuf was in error */
828 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
829 sizeof(*remote_rcs) * niocount, NULL);
830 if (remote_rcs == NULL) {
831 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
834 if (lustre_msg_swabbed(req->rq_repmsg))
835 for (i = 0; i < niocount; i++)
836 __swab32s(&remote_rcs[i]);
838 for (i = 0; i < niocount; i++) {
839 if (remote_rcs[i] < 0)
840 return(remote_rcs[i]);
842 if (remote_rcs[i] != 0) {
843 CERROR("rc[%d] invalid (%d) req %p\n",
844 i, remote_rcs[i], req);
849 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
850 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
851 requested_nob, req->rq_bulk->bd_nob_transferred);
858 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
860 if (p1->flag != p2->flag) {
861 unsigned mask = ~OBD_BRW_FROM_GRANT;
863 /* warn if we try to combine flags that we don't know to be
865 if ((p1->flag & mask) != (p2->flag & mask))
866 CERROR("is it ok to have flags 0x%x and 0x%x in the "
867 "same brw?\n", p1->flag, p2->flag);
871 return (p1->off + p1->count == p2->off);
874 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
875 struct brw_page **pga, int opc)
880 LASSERT (pg_count > 0);
881 while (nob > 0 && pg_count > 0) {
882 char *ptr = cfs_kmap(pga[i]->pg);
883 int off = pga[i]->off & ~CFS_PAGE_MASK;
884 int count = pga[i]->count > nob ? nob : pga[i]->count;
886 /* corrupt the data before we compute the checksum, to
887 * simulate an OST->client data error */
888 if (i == 0 && opc == OST_READ &&
889 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
890 memcpy(ptr + off, "bad1", min(4, nob));
891 cksum = crc32_le(cksum, ptr + off, count);
892 cfs_kunmap(pga[i]->pg);
893 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
896 nob -= pga[i]->count;
900 /* For sending we only compute the wrong checksum instead
901 * of corrupting the data so it is still correct on a redo */
902 if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
908 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
909 struct lov_stripe_md *lsm, obd_count page_count,
910 struct brw_page **pga,
911 struct ptlrpc_request **reqp)
913 struct ptlrpc_request *req;
914 struct ptlrpc_bulk_desc *desc;
915 struct ost_body *body;
916 struct obd_ioobj *ioobj;
917 struct niobuf_remote *niobuf;
918 int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
919 int niocount, i, requested_nob, opc, rc;
920 struct ptlrpc_request_pool *pool;
921 struct osc_brw_async_args *aa;
922 struct brw_page *pg_prev;
925 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
926 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
928 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
929 pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
931 for (niocount = i = 1; i < page_count; i++) {
932 if (!can_merge_pages(pga[i - 1], pga[i]))
936 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
937 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
939 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
944 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
946 if (opc == OST_WRITE)
947 desc = ptlrpc_prep_bulk_imp (req, page_count,
948 BULK_GET_SOURCE, OST_BULK_PORTAL);
950 desc = ptlrpc_prep_bulk_imp (req, page_count,
951 BULK_PUT_SINK, OST_BULK_PORTAL);
953 GOTO(out, rc = -ENOMEM);
954 /* NB request now owns desc and will free it when it gets freed */
956 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
957 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
958 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
959 niocount * sizeof(*niobuf));
961 memcpy(&body->oa, oa, sizeof(*oa));
963 obdo_to_ioobj(oa, ioobj);
964 ioobj->ioo_bufcnt = niocount;
966 LASSERT (page_count > 0);
968 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
969 struct brw_page *pg = pga[i];
971 LASSERT(pg->count > 0);
972 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
973 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
976 LASSERTF(i == 0 || pg->off > pg_prev->off,
977 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
978 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
980 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
981 pg_prev->pg, page_private(pg_prev->pg),
982 pg_prev->pg->index, pg_prev->off);
984 LASSERTF(i == 0 || pg->off > pg_prev->off,
985 "i %d p_c %u\n", i, page_count);
987 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
988 (pg->flag & OBD_BRW_SRVLOCK));
990 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
992 requested_nob += pg->count;
994 if (i > 0 && can_merge_pages(pg_prev, pg)) {
996 niobuf->len += pg->count;
998 niobuf->offset = pg->off;
999 niobuf->len = pg->count;
1000 niobuf->flags = pg->flag;
1005 LASSERTF((void *)(niobuf - niocount) ==
1006 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1007 niocount * sizeof(*niobuf)),
1008 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1009 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1010 (void *)(niobuf - niocount));
1012 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1014 /* size[REQ_REC_OFF] still sizeof (*body) */
1015 if (opc == OST_WRITE) {
1016 if (unlikely(cli->cl_checksum)) {
1017 body->oa.o_valid |= OBD_MD_FLCKSUM;
1018 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1021 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1023 /* save this in 'oa', too, for later checking */
1024 oa->o_valid |= OBD_MD_FLCKSUM;
1026 /* clear out the checksum flag, in case this is a
1027 * resend but cl_checksum is no longer set. b=11238 */
1028 oa->o_valid &= ~OBD_MD_FLCKSUM;
1030 oa->o_cksum = body->oa.o_cksum;
1031 /* 1 RC per niobuf */
1032 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1033 ptlrpc_req_set_repsize(req, 3, size);
1035 if (unlikely(cli->cl_checksum))
1036 body->oa.o_valid |= OBD_MD_FLCKSUM;
1037 /* 1 RC for the whole I/O */
1038 ptlrpc_req_set_repsize(req, 2, size);
1041 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1042 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1044 aa->aa_requested_nob = requested_nob;
1045 aa->aa_nio_count = niocount;
1046 aa->aa_page_count = page_count;
1050 INIT_LIST_HEAD(&aa->aa_oaps);
1056 ptlrpc_req_finished (req);
1060 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1061 __u32 client_cksum, __u32 server_cksum, int nob,
1062 obd_count page_count, struct brw_page **pga)
1067 if (server_cksum == client_cksum) {
1068 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1072 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1074 if (new_cksum == server_cksum)
1075 msg = "changed on the client after we checksummed it - "
1076 "likely false positive due to mmap IO (bug 11742)";
1077 else if (new_cksum == client_cksum)
1078 msg = "changed in transit before arrival at OST";
1080 msg = "changed in transit AND doesn't match the original - "
1081 "likely false positive due to mmap IO (bug 11742)";
1083 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1084 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1085 "["LPU64"-"LPU64"]\n",
1086 msg, libcfs_nid2str(peer->nid),
1087 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1088 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1091 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1093 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1094 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1095 client_cksum, server_cksum, new_cksum);
1100 /* Note rc enters this function as number of bytes transferred */
1101 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1103 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1104 const lnet_process_id_t *peer =
1105 &req->rq_import->imp_connection->c_peer;
1106 struct client_obd *cli = aa->aa_cli;
1107 struct ost_body *body;
1108 __u32 client_cksum = 0;
1111 if (rc < 0 && rc != -EDQUOT)
1114 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1115 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1116 lustre_swab_ost_body);
1118 CERROR ("Can't unpack body\n");
1122 /* set/clear over quota flag for a uid/gid */
1123 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1124 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1125 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1126 body->oa.o_gid, body->oa.o_valid,
1132 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1133 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1135 osc_update_grant(cli, body);
1137 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1139 CERROR ("Unexpected +ve rc %d\n", rc);
1142 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1144 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1146 check_write_checksum(&body->oa, peer, client_cksum,
1148 aa->aa_requested_nob,
1153 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1154 aa->aa_page_count, aa->aa_ppga);
1158 /* The rest of this function executes only for OST_READs */
1159 if (rc > aa->aa_requested_nob) {
1160 CERROR("Unexpected rc %d (%d requested)\n", rc,
1161 aa->aa_requested_nob);
1165 if (rc != req->rq_bulk->bd_nob_transferred) {
1166 CERROR ("Unexpected rc %d (%d transferred)\n",
1167 rc, req->rq_bulk->bd_nob_transferred);
1171 if (rc < aa->aa_requested_nob)
1172 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1174 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1175 static int cksum_counter;
1176 __u32 server_cksum = body->oa.o_cksum;
1180 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1181 aa->aa_ppga, OST_READ);
1183 if (peer->nid == req->rq_bulk->bd_sender) {
1187 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1190 if (server_cksum == ~0 && rc > 0) {
1191 CERROR("Protocol error: server %s set the 'checksum' "
1192 "bit, but didn't send a checksum. Not fatal, "
1193 "but please tell CFS.\n",
1194 libcfs_nid2str(peer->nid));
1195 } else if (server_cksum != client_cksum) {
1196 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1197 "%s%s%s inum "LPU64"/"LPU64" object "
1198 LPU64"/"LPU64" extent "
1199 "["LPU64"-"LPU64"]\n",
1200 req->rq_import->imp_obd->obd_name,
1201 libcfs_nid2str(peer->nid),
1203 body->oa.o_valid & OBD_MD_FLFID ?
1204 body->oa.o_fid : (__u64)0,
1205 body->oa.o_valid & OBD_MD_FLFID ?
1206 body->oa.o_generation :(__u64)0,
1208 body->oa.o_valid & OBD_MD_FLGROUP ?
1209 body->oa.o_gr : (__u64)0,
1210 aa->aa_ppga[0]->off,
1211 aa->aa_ppga[aa->aa_page_count-1]->off +
1212 aa->aa_ppga[aa->aa_page_count-1]->count -
1214 CERROR("client %x, server %x\n",
1215 client_cksum, server_cksum);
1217 aa->aa_oa->o_cksum = client_cksum;
1221 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1224 } else if (unlikely(client_cksum)) {
1225 static int cksum_missed;
1228 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1229 CERROR("Checksum %u requested from %s but not sent\n",
1230 cksum_missed, libcfs_nid2str(peer->nid));
1236 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1241 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1242 struct lov_stripe_md *lsm,
1243 obd_count page_count, struct brw_page **pga)
1245 struct ptlrpc_request *request;
1249 struct l_wait_info lwi;
1252 init_waitqueue_head(&waitq);
1255 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1256 page_count, pga, &request);
1260 rc = ptlrpc_queue_wait(request);
1262 if (rc == -ETIMEDOUT && request->rq_resend) {
1263 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1264 ptlrpc_req_finished(request);
1268 rc = osc_brw_fini_request(request, rc);
1270 ptlrpc_req_finished(request);
1271 if (osc_recoverable_error(rc)) {
1273 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1274 CERROR("too many resend retries, returning error\n");
1278 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1279 l_wait_event(waitq, 0, &lwi);
1286 int osc_brw_redo_request(struct ptlrpc_request *request,
1287 struct osc_brw_async_args *aa)
1289 struct ptlrpc_request *new_req;
1290 struct ptlrpc_request_set *set = request->rq_set;
1291 struct osc_brw_async_args *new_aa;
1292 struct osc_async_page *oap;
1296 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1297 CERROR("too many resend retries, returning error\n");
1301 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1303 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1304 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1305 aa->aa_cli, aa->aa_oa,
1306 NULL /* lsm unused by osc currently */,
1307 aa->aa_page_count, aa->aa_ppga, &new_req);
1311 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1313 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1314 if (oap->oap_request != NULL) {
1315 LASSERTF(request == oap->oap_request,
1316 "request %p != oap_request %p\n",
1317 request, oap->oap_request);
1318 if (oap->oap_interrupted) {
1319 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1320 ptlrpc_req_finished(new_req);
1325 /* New request takes over pga and oaps from old request.
1326 * Note that copying a list_head doesn't work, need to move it... */
1328 new_req->rq_interpret_reply = request->rq_interpret_reply;
1329 new_req->rq_async_args = request->rq_async_args;
1330 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1332 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1334 INIT_LIST_HEAD(&new_aa->aa_oaps);
1335 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1336 INIT_LIST_HEAD(&aa->aa_oaps);
1338 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1339 if (oap->oap_request) {
1340 ptlrpc_req_finished(oap->oap_request);
1341 oap->oap_request = ptlrpc_request_addref(new_req);
1344 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1346 DEBUG_REQ(D_INFO, new_req, "new request");
1348 ptlrpc_set_add_req(set, new_req);
1353 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1355 struct osc_brw_async_args *aa = data;
1360 rc = osc_brw_fini_request(request, rc);
1361 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1362 if (osc_recoverable_error(rc)) {
1363 rc = osc_brw_redo_request(request, aa);
1367 if ((rc >= 0) && request->rq_set && request->rq_set->set_countp)
1368 atomic_add(nob, (atomic_t *)request->rq_set->set_countp);
1369 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1370 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1371 aa->aa_cli->cl_w_in_flight--;
1373 aa->aa_cli->cl_r_in_flight--;
1375 for (i = 0; i < aa->aa_page_count; i++)
1376 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1377 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1378 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1383 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1384 struct lov_stripe_md *lsm, obd_count page_count,
1385 struct brw_page **pga, struct ptlrpc_request_set *set)
1387 struct ptlrpc_request *request;
1388 struct client_obd *cli = &exp->exp_obd->u.cli;
1390 struct osc_brw_async_args *aa;
1393 /* Consume write credits even if doing a sync write -
1394 * otherwise we may run out of space on OST due to grant. */
1395 if (cmd == OBD_BRW_WRITE) {
1396 client_obd_list_lock(&cli->cl_loi_list_lock);
1397 for (i = 0; i < page_count; i++) {
1398 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1399 osc_consume_write_grant(cli, pga[i]);
1401 client_obd_list_unlock(&cli->cl_loi_list_lock);
1404 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1405 page_count, pga, &request);
1407 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1408 if (cmd == OBD_BRW_READ) {
1409 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1410 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1411 ptlrpc_lprocfs_brw(request, OST_READ, aa->aa_requested_nob);
1413 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1414 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1415 cli->cl_w_in_flight);
1416 ptlrpc_lprocfs_brw(request, OST_WRITE, aa->aa_requested_nob);
1420 request->rq_interpret_reply = brw_interpret;
1421 ptlrpc_set_add_req(set, request);
1422 client_obd_list_lock(&cli->cl_loi_list_lock);
1423 if (cmd == OBD_BRW_READ)
1424 cli->cl_r_in_flight++;
1426 cli->cl_w_in_flight++;
1427 client_obd_list_unlock(&cli->cl_loi_list_lock);
1428 } else if (cmd == OBD_BRW_WRITE) {
1429 client_obd_list_lock(&cli->cl_loi_list_lock);
1430 for (i = 0; i < page_count; i++)
1431 osc_release_write_grant(cli, pga[i], 0);
1432 client_obd_list_unlock(&cli->cl_loi_list_lock);
1439 * ugh, we want disk allocation on the target to happen in offset order. we'll
1440 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1441 * fine for our small page arrays and doesn't require allocation. its an
1442 * insertion sort that swaps elements that are strides apart, shrinking the
1443 * stride down until its '1' and the array is sorted.
1445 static void sort_brw_pages(struct brw_page **array, int num)
1448 struct brw_page *tmp;
1452 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1457 for (i = stride ; i < num ; i++) {
1460 while (j >= stride && array[j-stride]->off > tmp->off) {
1461 array[j] = array[j - stride];
1466 } while (stride > 1);
1469 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1475 LASSERT (pages > 0);
1476 offset = pg[i]->off & (~CFS_PAGE_MASK);
1480 if (pages == 0) /* that's all */
1483 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1484 return count; /* doesn't end on page boundary */
1487 offset = pg[i]->off & (~CFS_PAGE_MASK);
1488 if (offset != 0) /* doesn't start on page boundary */
1495 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1497 struct brw_page **ppga;
1500 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1504 for (i = 0; i < count; i++)
1509 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1511 LASSERT(ppga != NULL);
1512 OBD_FREE(ppga, sizeof(*ppga) * count);
1515 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1516 obd_count page_count, struct brw_page *pga,
1517 struct obd_trans_info *oti)
1519 struct obdo *saved_oa = NULL;
1520 struct brw_page **ppga, **orig;
1521 struct obd_import *imp = class_exp2cliimp(exp);
1522 struct client_obd *cli = &imp->imp_obd->u.cli;
1523 int rc, page_count_orig;
1526 if (cmd & OBD_BRW_CHECK) {
1527 /* The caller just wants to know if there's a chance that this
1528 * I/O can succeed */
1530 if (imp == NULL || imp->imp_invalid)
1535 /* test_brw with a failed create can trip this, maybe others. */
1536 LASSERT(cli->cl_max_pages_per_rpc);
1540 orig = ppga = osc_build_ppga(pga, page_count);
1543 page_count_orig = page_count;
1545 sort_brw_pages(ppga, page_count);
1546 while (page_count) {
1547 obd_count pages_per_brw;
1549 if (page_count > cli->cl_max_pages_per_rpc)
1550 pages_per_brw = cli->cl_max_pages_per_rpc;
1552 pages_per_brw = page_count;
1554 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1556 if (saved_oa != NULL) {
1557 /* restore previously saved oa */
1558 *oinfo->oi_oa = *saved_oa;
1559 } else if (page_count > pages_per_brw) {
1560 /* save a copy of oa (brw will clobber it) */
1561 OBDO_ALLOC(saved_oa);
1562 if (saved_oa == NULL)
1563 GOTO(out, rc = -ENOMEM);
1564 *saved_oa = *oinfo->oi_oa;
1567 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1568 pages_per_brw, ppga);
1573 page_count -= pages_per_brw;
1574 ppga += pages_per_brw;
1578 osc_release_ppga(orig, page_count_orig);
1580 if (saved_oa != NULL)
1581 OBDO_FREE(saved_oa);
1586 static int osc_brw_async(int cmd, struct obd_export *exp,
1587 struct obd_info *oinfo, obd_count page_count,
1588 struct brw_page *pga, struct obd_trans_info *oti,
1589 struct ptlrpc_request_set *set)
1591 struct brw_page **ppga, **orig;
1592 int page_count_orig;
1596 if (cmd & OBD_BRW_CHECK) {
1597 /* The caller just wants to know if there's a chance that this
1598 * I/O can succeed */
1599 struct obd_import *imp = class_exp2cliimp(exp);
1601 if (imp == NULL || imp->imp_invalid)
1606 orig = ppga = osc_build_ppga(pga, page_count);
1609 page_count_orig = page_count;
1611 sort_brw_pages(ppga, page_count);
1612 while (page_count) {
1613 struct brw_page **copy;
1614 obd_count pages_per_brw;
1616 pages_per_brw = min_t(obd_count, page_count,
1617 class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1619 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1621 /* use ppga only if single RPC is going to fly */
1622 if (pages_per_brw != page_count_orig || ppga != orig) {
1623 OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1625 GOTO(out, rc = -ENOMEM);
1626 memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1630 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1631 pages_per_brw, copy, set);
1635 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1640 /* we passed it to async_internal() which is
1641 * now responsible for releasing memory */
1645 page_count -= pages_per_brw;
1646 ppga += pages_per_brw;
1650 osc_release_ppga(orig, page_count_orig);
1654 static void osc_check_rpcs(struct client_obd *cli);
1656 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1657 * the dirty accounting. Writeback completes or truncate happens before
1658 * writing starts. Must be called with the loi lock held. */
1659 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1662 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1665 /* This maintains the lists of pending pages to read/write for a given object
1666 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1667 * to quickly find objects that are ready to send an RPC. */
1668 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1674 if (lop->lop_num_pending == 0)
1677 /* if we have an invalid import we want to drain the queued pages
1678 * by forcing them through rpcs that immediately fail and complete
1679 * the pages. recovery relies on this to empty the queued pages
1680 * before canceling the locks and evicting down the llite pages */
1681 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1684 /* stream rpcs in queue order as long as as there is an urgent page
1685 * queued. this is our cheap solution for good batching in the case
1686 * where writepage marks some random page in the middle of the file
1687 * as urgent because of, say, memory pressure */
1688 if (!list_empty(&lop->lop_urgent)) {
1689 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1693 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1694 optimal = cli->cl_max_pages_per_rpc;
1695 if (cmd & OBD_BRW_WRITE) {
1696 /* trigger a write rpc stream as long as there are dirtiers
1697 * waiting for space. as they're waiting, they're not going to
1698 * create more pages to coallesce with what's waiting.. */
1699 if (!list_empty(&cli->cl_cache_waiters)) {
1700 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1704 /* +16 to avoid triggering rpcs that would want to include pages
1705 * that are being queued but which can't be made ready until
1706 * the queuer finishes with the page. this is a wart for
1707 * llite::commit_write() */
1710 if (lop->lop_num_pending >= optimal)
1716 static void on_list(struct list_head *item, struct list_head *list,
1719 if (list_empty(item) && should_be_on)
1720 list_add_tail(item, list);
1721 else if (!list_empty(item) && !should_be_on)
1722 list_del_init(item);
1725 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1726 * can find pages to build into rpcs quickly */
1727 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1729 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1730 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1731 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1733 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1734 loi->loi_write_lop.lop_num_pending);
1736 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1737 loi->loi_read_lop.lop_num_pending);
1740 static void lop_update_pending(struct client_obd *cli,
1741 struct loi_oap_pages *lop, int cmd, int delta)
1743 lop->lop_num_pending += delta;
1744 if (cmd & OBD_BRW_WRITE)
1745 cli->cl_pending_w_pages += delta;
1747 cli->cl_pending_r_pages += delta;
1750 /* this is called when a sync waiter receives an interruption. Its job is to
1751 * get the caller woken as soon as possible. If its page hasn't been put in an
1752 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1753 * desiring interruption which will forcefully complete the rpc once the rpc
1755 static void osc_occ_interrupted(struct oig_callback_context *occ)
1757 struct osc_async_page *oap;
1758 struct loi_oap_pages *lop;
1759 struct lov_oinfo *loi;
1762 /* XXX member_of() */
1763 oap = list_entry(occ, struct osc_async_page, oap_occ);
1765 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1767 oap->oap_interrupted = 1;
1769 /* ok, it's been put in an rpc. only one oap gets a request reference */
1770 if (oap->oap_request != NULL) {
1771 ptlrpc_mark_interrupted(oap->oap_request);
1772 ptlrpcd_wake(oap->oap_request);
1776 /* we don't get interruption callbacks until osc_trigger_group_io()
1777 * has been called and put the sync oaps in the pending/urgent lists.*/
1778 if (!list_empty(&oap->oap_pending_item)) {
1779 list_del_init(&oap->oap_pending_item);
1780 list_del_init(&oap->oap_urgent_item);
1783 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1784 &loi->loi_write_lop : &loi->loi_read_lop;
1785 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1786 loi_list_maint(oap->oap_cli, oap->oap_loi);
1788 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1789 oap->oap_oig = NULL;
1793 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1796 /* this is trying to propogate async writeback errors back up to the
1797 * application. As an async write fails we record the error code for later if
1798 * the app does an fsync. As long as errors persist we force future rpcs to be
1799 * sync so that the app can get a sync error and break the cycle of queueing
1800 * pages for which writeback will fail. */
1801 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1808 ar->ar_force_sync = 1;
1809 ar->ar_min_xid = ptlrpc_sample_next_xid();
1814 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1815 ar->ar_force_sync = 0;
1818 static void osc_oap_to_pending(struct osc_async_page *oap)
1820 struct loi_oap_pages *lop;
1822 if (oap->oap_cmd & OBD_BRW_WRITE)
1823 lop = &oap->oap_loi->loi_write_lop;
1825 lop = &oap->oap_loi->loi_read_lop;
1827 if (oap->oap_async_flags & ASYNC_URGENT)
1828 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1829 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1830 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1833 /* this must be called holding the loi list lock to give coverage to exit_cache,
1834 * async_flag maintenance, and oap_request */
1835 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1836 struct osc_async_page *oap, int sent, int rc)
1841 if (oap->oap_request != NULL) {
1842 xid = ptlrpc_req_xid(oap->oap_request);
1843 ptlrpc_req_finished(oap->oap_request);
1844 oap->oap_request = NULL;
1847 oap->oap_async_flags = 0;
1848 oap->oap_interrupted = 0;
1850 if (oap->oap_cmd & OBD_BRW_WRITE) {
1851 osc_process_ar(&cli->cl_ar, xid, rc);
1852 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1855 if (rc == 0 && oa != NULL) {
1856 if (oa->o_valid & OBD_MD_FLBLOCKS)
1857 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1858 if (oa->o_valid & OBD_MD_FLMTIME)
1859 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1860 if (oa->o_valid & OBD_MD_FLATIME)
1861 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1862 if (oa->o_valid & OBD_MD_FLCTIME)
1863 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1867 osc_exit_cache(cli, oap, sent);
1868 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1869 oap->oap_oig = NULL;
1874 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1875 oap->oap_cmd, oa, rc);
1877 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1878 * I/O on the page could start, but OSC calls it under lock
1879 * and thus we can add oap back to pending safely */
1881 /* upper layer wants to leave the page on pending queue */
1882 osc_oap_to_pending(oap);
1884 osc_exit_cache(cli, oap, sent);
1888 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1890 struct osc_brw_async_args *aa = data;
1891 struct osc_async_page *oap, *tmp;
1892 struct client_obd *cli;
1895 rc = osc_brw_fini_request(request, rc);
1896 CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1898 if (osc_recoverable_error(rc)) {
1899 rc = osc_brw_redo_request(request, aa);
1905 client_obd_list_lock(&cli->cl_loi_list_lock);
1906 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1907 * is called so we know whether to go to sync BRWs or wait for more
1908 * RPCs to complete */
1909 if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1910 cli->cl_w_in_flight--;
1912 cli->cl_r_in_flight--;
1914 /* the caller may re-use the oap after the completion call so
1915 * we need to clean it up a little */
1916 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1917 list_del_init(&oap->oap_rpc_item);
1918 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1921 osc_wake_cache_waiters(cli);
1922 osc_check_rpcs(cli);
1923 client_obd_list_unlock(&cli->cl_loi_list_lock);
1925 OBDO_FREE(aa->aa_oa);
1927 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1931 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1932 struct list_head *rpc_list,
1933 int page_count, int cmd)
1935 struct ptlrpc_request *req;
1936 struct brw_page **pga = NULL;
1937 struct osc_brw_async_args *aa;
1938 struct obdo *oa = NULL;
1939 struct obd_async_page_ops *ops = NULL;
1940 void *caller_data = NULL;
1941 struct osc_async_page *oap;
1945 LASSERT(!list_empty(rpc_list));
1947 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1949 RETURN(ERR_PTR(-ENOMEM));
1953 GOTO(out, req = ERR_PTR(-ENOMEM));
1956 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1958 ops = oap->oap_caller_ops;
1959 caller_data = oap->oap_caller_data;
1961 pga[i] = &oap->oap_brw_page;
1962 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1963 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1964 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1968 /* always get the data for the obdo for the rpc */
1969 LASSERT(ops != NULL);
1970 ops->ap_fill_obdo(caller_data, cmd, oa);
1972 sort_brw_pages(pga, page_count);
1973 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1975 CERROR("prep_req failed: %d\n", rc);
1976 GOTO(out, req = ERR_PTR(rc));
1979 /* Need to update the timestamps after the request is built in case
1980 * we race with setattr (locally or in queue at OST). If OST gets
1981 * later setattr before earlier BRW (as determined by the request xid),
1982 * the OST will not use BRW timestamps. Sadly, there is no obvious
1983 * way to do this in a single call. bug 10150 */
1984 ops->ap_update_obdo(caller_data, cmd, oa,
1985 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1987 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1988 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1989 INIT_LIST_HEAD(&aa->aa_oaps);
1990 list_splice(rpc_list, &aa->aa_oaps);
1991 INIT_LIST_HEAD(rpc_list);
1998 OBD_FREE(pga, sizeof(*pga) * page_count);
2003 /* the loi lock is held across this function but it's allowed to release
2004 * and reacquire it during its work */
2005 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2006 int cmd, struct loi_oap_pages *lop)
2008 struct ptlrpc_request *req;
2009 obd_count page_count = 0;
2010 struct osc_async_page *oap = NULL, *tmp;
2011 struct osc_brw_async_args *aa;
2012 struct obd_async_page_ops *ops;
2013 CFS_LIST_HEAD(rpc_list);
2014 unsigned int ending_offset;
2015 unsigned starting_offset = 0;
2019 /* first we find the pages we're allowed to work with */
2020 list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2021 ops = oap->oap_caller_ops;
2023 LASSERT(oap->oap_magic == OAP_MAGIC);
2025 if (page_count != 0 &&
2026 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2027 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2028 " oap %p, page %p, srvlock %u\n",
2029 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2032 /* in llite being 'ready' equates to the page being locked
2033 * until completion unlocks it. commit_write submits a page
2034 * as not ready because its unlock will happen unconditionally
2035 * as the call returns. if we race with commit_write giving
2036 * us that page we dont' want to create a hole in the page
2037 * stream, so we stop and leave the rpc to be fired by
2038 * another dirtier or kupdated interval (the not ready page
2039 * will still be on the dirty list). we could call in
2040 * at the end of ll_file_write to process the queue again. */
2041 if (!(oap->oap_async_flags & ASYNC_READY)) {
2042 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2044 CDEBUG(D_INODE, "oap %p page %p returned %d "
2045 "instead of ready\n", oap,
2049 /* llite is telling us that the page is still
2050 * in commit_write and that we should try
2051 * and put it in an rpc again later. we
2052 * break out of the loop so we don't create
2053 * a hole in the sequence of pages in the rpc
2058 /* the io isn't needed.. tell the checks
2059 * below to complete the rpc with EINTR */
2060 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2061 oap->oap_count = -EINTR;
2064 oap->oap_async_flags |= ASYNC_READY;
2067 LASSERTF(0, "oap %p page %p returned %d "
2068 "from make_ready\n", oap,
2076 * Page submitted for IO has to be locked. Either by
2077 * ->ap_make_ready() or by higher layers.
2079 * XXX nikita: this assertion should be adjusted when lustre
2080 * starts using PG_writeback for pages being written out.
2082 #if defined(__KERNEL__) && defined(__LINUX__)
2083 LASSERT(PageLocked(oap->oap_page));
2085 /* If there is a gap at the start of this page, it can't merge
2086 * with any previous page, so we'll hand the network a
2087 * "fragmented" page array that it can't transfer in 1 RDMA */
2088 if (page_count != 0 && oap->oap_page_off != 0)
2091 /* take the page out of our book-keeping */
2092 list_del_init(&oap->oap_pending_item);
2093 lop_update_pending(cli, lop, cmd, -1);
2094 list_del_init(&oap->oap_urgent_item);
2096 if (page_count == 0)
2097 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2098 (PTLRPC_MAX_BRW_SIZE - 1);
2100 /* ask the caller for the size of the io as the rpc leaves. */
2101 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2103 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2104 if (oap->oap_count <= 0) {
2105 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2107 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2111 /* now put the page back in our accounting */
2112 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2113 if (page_count == 0)
2114 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2115 if (++page_count >= cli->cl_max_pages_per_rpc)
2118 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2119 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2120 * have the same alignment as the initial writes that allocated
2121 * extents on the server. */
2122 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2123 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2124 if (ending_offset == 0)
2127 /* If there is a gap at the end of this page, it can't merge
2128 * with any subsequent pages, so we'll hand the network a
2129 * "fragmented" page array that it can't transfer in 1 RDMA */
2130 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2134 osc_wake_cache_waiters(cli);
2136 if (page_count == 0)
2139 loi_list_maint(cli, loi);
2141 client_obd_list_unlock(&cli->cl_loi_list_lock);
2143 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2145 /* this should happen rarely and is pretty bad, it makes the
2146 * pending list not follow the dirty order */
2147 client_obd_list_lock(&cli->cl_loi_list_lock);
2148 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2149 list_del_init(&oap->oap_rpc_item);
2151 /* queued sync pages can be torn down while the pages
2152 * were between the pending list and the rpc */
2153 if (oap->oap_interrupted) {
2154 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2155 osc_ap_completion(cli, NULL, oap, 0,
2159 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2161 loi_list_maint(cli, loi);
2162 RETURN(PTR_ERR(req));
2165 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2166 if (cmd == OBD_BRW_READ) {
2167 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2168 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2169 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2170 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2171 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2173 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2174 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2175 cli->cl_w_in_flight);
2176 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2177 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2178 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2181 client_obd_list_lock(&cli->cl_loi_list_lock);
2183 if (cmd == OBD_BRW_READ)
2184 cli->cl_r_in_flight++;
2186 cli->cl_w_in_flight++;
2188 /* queued sync pages can be torn down while the pages
2189 * were between the pending list and the rpc */
2191 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2192 /* only one oap gets a request reference */
2195 if (oap->oap_interrupted && !req->rq_intr) {
2196 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2198 ptlrpc_mark_interrupted(req);
2202 tmp->oap_request = ptlrpc_request_addref(req);
2204 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2205 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2207 req->rq_interpret_reply = brw_interpret_oap;
2208 ptlrpcd_add_req(req);
2212 #define LOI_DEBUG(LOI, STR, args...) \
2213 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2214 !list_empty(&(LOI)->loi_cli_item), \
2215 (LOI)->loi_write_lop.lop_num_pending, \
2216 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2217 (LOI)->loi_read_lop.lop_num_pending, \
2218 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2221 /* This is called by osc_check_rpcs() to find which objects have pages that
2222 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2223 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2226 /* first return all objects which we already know to have
2227 * pages ready to be stuffed into rpcs */
2228 if (!list_empty(&cli->cl_loi_ready_list))
2229 RETURN(list_entry(cli->cl_loi_ready_list.next,
2230 struct lov_oinfo, loi_cli_item));
2232 /* then if we have cache waiters, return all objects with queued
2233 * writes. This is especially important when many small files
2234 * have filled up the cache and not been fired into rpcs because
2235 * they don't pass the nr_pending/object threshhold */
2236 if (!list_empty(&cli->cl_cache_waiters) &&
2237 !list_empty(&cli->cl_loi_write_list))
2238 RETURN(list_entry(cli->cl_loi_write_list.next,
2239 struct lov_oinfo, loi_write_item));
2241 /* then return all queued objects when we have an invalid import
2242 * so that they get flushed */
2243 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2244 if (!list_empty(&cli->cl_loi_write_list))
2245 RETURN(list_entry(cli->cl_loi_write_list.next,
2246 struct lov_oinfo, loi_write_item));
2247 if (!list_empty(&cli->cl_loi_read_list))
2248 RETURN(list_entry(cli->cl_loi_read_list.next,
2249 struct lov_oinfo, loi_read_item));
2254 /* called with the loi list lock held */
2255 static void osc_check_rpcs(struct client_obd *cli)
2257 struct lov_oinfo *loi;
2258 int rc = 0, race_counter = 0;
2261 while ((loi = osc_next_loi(cli)) != NULL) {
2262 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2264 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2267 /* attempt some read/write balancing by alternating between
2268 * reads and writes in an object. The makes_rpc checks here
2269 * would be redundant if we were getting read/write work items
2270 * instead of objects. we don't want send_oap_rpc to drain a
2271 * partial read pending queue when we're given this object to
2272 * do io on writes while there are cache waiters */
2273 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2274 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2275 &loi->loi_write_lop);
2283 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2284 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2285 &loi->loi_read_lop);
2294 /* attempt some inter-object balancing by issueing rpcs
2295 * for each object in turn */
2296 if (!list_empty(&loi->loi_cli_item))
2297 list_del_init(&loi->loi_cli_item);
2298 if (!list_empty(&loi->loi_write_item))
2299 list_del_init(&loi->loi_write_item);
2300 if (!list_empty(&loi->loi_read_item))
2301 list_del_init(&loi->loi_read_item);
2303 loi_list_maint(cli, loi);
2305 /* send_oap_rpc fails with 0 when make_ready tells it to
2306 * back off. llite's make_ready does this when it tries
2307 * to lock a page queued for write that is already locked.
2308 * we want to try sending rpcs from many objects, but we
2309 * don't want to spin failing with 0. */
2310 if (race_counter == 10)
2316 /* we're trying to queue a page in the osc so we're subject to the
2317 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2318 * If the osc's queued pages are already at that limit, then we want to sleep
2319 * until there is space in the osc's queue for us. We also may be waiting for
2320 * write credits from the OST if there are RPCs in flight that may return some
2321 * before we fall back to sync writes.
2323 * We need this know our allocation was granted in the presence of signals */
2324 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2328 client_obd_list_lock(&cli->cl_loi_list_lock);
2329 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2330 client_obd_list_unlock(&cli->cl_loi_list_lock);
2334 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2335 * grant or cache space. */
2336 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2337 struct osc_async_page *oap)
2339 struct osc_cache_waiter ocw;
2340 struct l_wait_info lwi = { 0 };
2343 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2344 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2345 cli->cl_dirty_max, obd_max_dirty_pages,
2346 cli->cl_lost_grant, cli->cl_avail_grant);
2348 /* force the caller to try sync io. this can jump the list
2349 * of queued writes and create a discontiguous rpc stream */
2350 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2351 loi->loi_ar.ar_force_sync)
2354 /* Hopefully normal case - cache space and write credits available */
2355 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2356 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2357 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2358 /* account for ourselves */
2359 osc_consume_write_grant(cli, &oap->oap_brw_page);
2363 /* Make sure that there are write rpcs in flight to wait for. This
2364 * is a little silly as this object may not have any pending but
2365 * other objects sure might. */
2366 if (cli->cl_w_in_flight) {
2367 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2368 cfs_waitq_init(&ocw.ocw_waitq);
2372 loi_list_maint(cli, loi);
2373 osc_check_rpcs(cli);
2374 client_obd_list_unlock(&cli->cl_loi_list_lock);
2376 CDEBUG(D_CACHE, "sleeping for cache space\n");
2377 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2379 client_obd_list_lock(&cli->cl_loi_list_lock);
2380 if (!list_empty(&ocw.ocw_entry)) {
2381 list_del(&ocw.ocw_entry);
2390 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2391 struct lov_oinfo *loi, cfs_page_t *page,
2392 obd_off offset, struct obd_async_page_ops *ops,
2393 void *data, void **res)
2395 struct osc_async_page *oap;
2399 return size_round(sizeof(*oap));
2402 oap->oap_magic = OAP_MAGIC;
2403 oap->oap_cli = &exp->exp_obd->u.cli;
2406 oap->oap_caller_ops = ops;
2407 oap->oap_caller_data = data;
2409 oap->oap_page = page;
2410 oap->oap_obj_off = offset;
2412 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2413 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2414 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2416 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2418 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2422 struct osc_async_page *oap_from_cookie(void *cookie)
2424 struct osc_async_page *oap = cookie;
2425 if (oap->oap_magic != OAP_MAGIC)
2426 return ERR_PTR(-EINVAL);
2430 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2431 struct lov_oinfo *loi, void *cookie,
2432 int cmd, obd_off off, int count,
2433 obd_flag brw_flags, enum async_flags async_flags)
2435 struct client_obd *cli = &exp->exp_obd->u.cli;
2436 struct osc_async_page *oap;
2440 oap = oap_from_cookie(cookie);
2442 RETURN(PTR_ERR(oap));
2444 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2447 if (!list_empty(&oap->oap_pending_item) ||
2448 !list_empty(&oap->oap_urgent_item) ||
2449 !list_empty(&oap->oap_rpc_item))
2452 /* check if the file's owner/group is over quota */
2453 #ifdef HAVE_QUOTA_SUPPORT
2454 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2455 struct obd_async_page_ops *ops;
2462 ops = oap->oap_caller_ops;
2463 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2464 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2475 loi = lsm->lsm_oinfo[0];
2477 client_obd_list_lock(&cli->cl_loi_list_lock);
2480 oap->oap_page_off = off;
2481 oap->oap_count = count;
2482 oap->oap_brw_flags = brw_flags;
2483 oap->oap_async_flags = async_flags;
2485 if (cmd & OBD_BRW_WRITE) {
2486 rc = osc_enter_cache(cli, loi, oap);
2488 client_obd_list_unlock(&cli->cl_loi_list_lock);
2493 osc_oap_to_pending(oap);
2494 loi_list_maint(cli, loi);
2496 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2499 osc_check_rpcs(cli);
2500 client_obd_list_unlock(&cli->cl_loi_list_lock);
2505 /* aka (~was & now & flag), but this is more clear :) */
2506 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2508 static int osc_set_async_flags(struct obd_export *exp,
2509 struct lov_stripe_md *lsm,
2510 struct lov_oinfo *loi, void *cookie,
2511 obd_flag async_flags)
2513 struct client_obd *cli = &exp->exp_obd->u.cli;
2514 struct loi_oap_pages *lop;
2515 struct osc_async_page *oap;
2519 oap = oap_from_cookie(cookie);
2521 RETURN(PTR_ERR(oap));
2524 * bug 7311: OST-side locking is only supported for liblustre for now
2525 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2526 * implementation has to handle case where OST-locked page was picked
2527 * up by, e.g., ->writepage().
2529 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2530 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2533 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2537 loi = lsm->lsm_oinfo[0];
2539 if (oap->oap_cmd & OBD_BRW_WRITE) {
2540 lop = &loi->loi_write_lop;
2542 lop = &loi->loi_read_lop;
2545 client_obd_list_lock(&cli->cl_loi_list_lock);
2547 if (list_empty(&oap->oap_pending_item))
2548 GOTO(out, rc = -EINVAL);
2550 if ((oap->oap_async_flags & async_flags) == async_flags)
2553 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2554 oap->oap_async_flags |= ASYNC_READY;
2556 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2557 if (list_empty(&oap->oap_rpc_item)) {
2558 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2559 loi_list_maint(cli, loi);
2563 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2564 oap->oap_async_flags);
2566 osc_check_rpcs(cli);
2567 client_obd_list_unlock(&cli->cl_loi_list_lock);
2571 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2572 struct lov_oinfo *loi,
2573 struct obd_io_group *oig, void *cookie,
2574 int cmd, obd_off off, int count,
2576 obd_flag async_flags)
2578 struct client_obd *cli = &exp->exp_obd->u.cli;
2579 struct osc_async_page *oap;
2580 struct loi_oap_pages *lop;
2584 oap = oap_from_cookie(cookie);
2586 RETURN(PTR_ERR(oap));
2588 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2591 if (!list_empty(&oap->oap_pending_item) ||
2592 !list_empty(&oap->oap_urgent_item) ||
2593 !list_empty(&oap->oap_rpc_item))
2597 loi = lsm->lsm_oinfo[0];
2599 client_obd_list_lock(&cli->cl_loi_list_lock);
2602 oap->oap_page_off = off;
2603 oap->oap_count = count;
2604 oap->oap_brw_flags = brw_flags;
2605 oap->oap_async_flags = async_flags;
2607 if (cmd & OBD_BRW_WRITE)
2608 lop = &loi->loi_write_lop;
2610 lop = &loi->loi_read_lop;
2612 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2613 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2615 rc = oig_add_one(oig, &oap->oap_occ);
2618 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2619 oap, oap->oap_page, rc);
2621 client_obd_list_unlock(&cli->cl_loi_list_lock);
2626 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2627 struct loi_oap_pages *lop, int cmd)
2629 struct list_head *pos, *tmp;
2630 struct osc_async_page *oap;
2632 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2633 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2634 list_del(&oap->oap_pending_item);
2635 osc_oap_to_pending(oap);
2637 loi_list_maint(cli, loi);
2640 static int osc_trigger_group_io(struct obd_export *exp,
2641 struct lov_stripe_md *lsm,
2642 struct lov_oinfo *loi,
2643 struct obd_io_group *oig)
2645 struct client_obd *cli = &exp->exp_obd->u.cli;
2649 loi = lsm->lsm_oinfo[0];
2651 client_obd_list_lock(&cli->cl_loi_list_lock);
2653 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2654 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2656 osc_check_rpcs(cli);
2657 client_obd_list_unlock(&cli->cl_loi_list_lock);
2662 static int osc_teardown_async_page(struct obd_export *exp,
2663 struct lov_stripe_md *lsm,
2664 struct lov_oinfo *loi, void *cookie)
2666 struct client_obd *cli = &exp->exp_obd->u.cli;
2667 struct loi_oap_pages *lop;
2668 struct osc_async_page *oap;
2672 oap = oap_from_cookie(cookie);
2674 RETURN(PTR_ERR(oap));
2677 loi = lsm->lsm_oinfo[0];
2679 if (oap->oap_cmd & OBD_BRW_WRITE) {
2680 lop = &loi->loi_write_lop;
2682 lop = &loi->loi_read_lop;
2685 client_obd_list_lock(&cli->cl_loi_list_lock);
2687 if (!list_empty(&oap->oap_rpc_item))
2688 GOTO(out, rc = -EBUSY);
2690 osc_exit_cache(cli, oap, 0);
2691 osc_wake_cache_waiters(cli);
2693 if (!list_empty(&oap->oap_urgent_item)) {
2694 list_del_init(&oap->oap_urgent_item);
2695 oap->oap_async_flags &= ~ASYNC_URGENT;
2697 if (!list_empty(&oap->oap_pending_item)) {
2698 list_del_init(&oap->oap_pending_item);
2699 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2701 loi_list_maint(cli, loi);
2703 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2705 client_obd_list_unlock(&cli->cl_loi_list_lock);
2709 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2712 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2715 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2718 lock_res_and_lock(lock);
2721 /* Liang XXX: Darwin and Winnt checking should be added */
2722 if (lock->l_ast_data && lock->l_ast_data != data) {
2723 struct inode *new_inode = data;
2724 struct inode *old_inode = lock->l_ast_data;
2725 if (!(old_inode->i_state & I_FREEING))
2726 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2727 LASSERTF(old_inode->i_state & I_FREEING,
2728 "Found existing inode %p/%lu/%u state %lu in lock: "
2729 "setting data to %p/%lu/%u\n", old_inode,
2730 old_inode->i_ino, old_inode->i_generation,
2732 new_inode, new_inode->i_ino, new_inode->i_generation);
2736 lock->l_ast_data = data;
2737 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2738 unlock_res_and_lock(lock);
2739 LDLM_LOCK_PUT(lock);
2742 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2743 ldlm_iterator_t replace, void *data)
2745 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2746 struct obd_device *obd = class_exp2obd(exp);
2748 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2752 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2758 /* The request was created before ldlm_cli_enqueue call. */
2759 if (rc == ELDLM_LOCK_ABORTED) {
2760 struct ldlm_reply *rep;
2762 /* swabbed by ldlm_cli_enqueue() */
2763 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2764 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2766 LASSERT(rep != NULL);
2767 if (rep->lock_policy_res1)
2768 rc = rep->lock_policy_res1;
2772 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2773 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2774 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2775 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2776 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2779 /* Call the update callback. */
2780 rc = oinfo->oi_cb_up(oinfo, rc);
2784 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2785 struct osc_enqueue_args *aa, int rc)
2787 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2788 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2789 struct ldlm_lock *lock;
2791 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2793 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2795 /* Complete obtaining the lock procedure. */
2796 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2798 &aa->oa_oi->oi_flags,
2799 &lsm->lsm_oinfo[0]->loi_lvb,
2800 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2801 lustre_swab_ost_lvb,
2802 aa->oa_oi->oi_lockh, rc);
2804 /* Complete osc stuff. */
2805 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2807 /* Release the lock for async request. */
2808 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2809 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2811 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2812 aa->oa_oi->oi_lockh, req, aa);
2813 LDLM_LOCK_PUT(lock);
2817 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2818 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2819 * other synchronous requests, however keeping some locks and trying to obtain
2820 * others may take a considerable amount of time in a case of ost failure; and
2821 * when other sync requests do not get released lock from a client, the client
2822 * is excluded from the cluster -- such scenarious make the life difficult, so
2823 * release locks just after they are obtained. */
2824 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2825 struct ldlm_enqueue_info *einfo,
2826 struct ptlrpc_request_set *rqset)
2828 struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2829 struct obd_device *obd = exp->exp_obd;
2830 struct ldlm_reply *rep;
2831 struct ptlrpc_request *req = NULL;
2832 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2836 /* Filesystem lock extents are extended to page boundaries so that
2837 * dealing with the page cache is a little smoother. */
2838 oinfo->oi_policy.l_extent.start -=
2839 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2840 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2842 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2845 /* Next, search for already existing extent locks that will cover us */
2846 rc = ldlm_lock_match(obd->obd_namespace,
2847 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2848 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2851 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2854 /* I would like to be able to ASSERT here that rss <=
2855 * kms, but I can't, for reasons which are explained in
2859 /* We already have a lock, and it's referenced */
2860 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2862 /* For async requests, decref the lock. */
2864 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2869 /* If we're trying to read, we also search for an existing PW lock. The
2870 * VFS and page cache already protect us locally, so lots of readers/
2871 * writers can share a single PW lock.
2873 * There are problems with conversion deadlocks, so instead of
2874 * converting a read lock to a write lock, we'll just enqueue a new
2877 * At some point we should cancel the read lock instead of making them
2878 * send us a blocking callback, but there are problems with canceling
2879 * locks out from other users right now, too. */
2881 if (einfo->ei_mode == LCK_PR) {
2882 rc = ldlm_lock_match(obd->obd_namespace,
2883 oinfo->oi_flags | LDLM_FL_LVB_READY,
2884 &res_id, einfo->ei_type, &oinfo->oi_policy,
2885 LCK_PW, oinfo->oi_lockh);
2887 /* FIXME: This is not incredibly elegant, but it might
2888 * be more elegant than adding another parameter to
2889 * lock_match. I want a second opinion. */
2890 /* addref the lock only if not async requests. */
2892 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2893 osc_set_data_with_check(oinfo->oi_lockh,
2896 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2897 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2905 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2906 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2907 [DLM_LOCKREQ_OFF + 1] = 0 };
2909 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2913 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2914 size[DLM_REPLY_REC_OFF] =
2915 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2916 ptlrpc_req_set_repsize(req, 3, size);
2919 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2920 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2922 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
2923 &oinfo->oi_policy, &oinfo->oi_flags,
2924 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2925 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2926 lustre_swab_ost_lvb, oinfo->oi_lockh,
2930 struct osc_enqueue_args *aa;
2931 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2932 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2937 req->rq_interpret_reply = osc_enqueue_interpret;
2938 ptlrpc_set_add_req(rqset, req);
2939 } else if (intent) {
2940 ptlrpc_req_finished(req);
2945 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2947 ptlrpc_req_finished(req);
2952 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2953 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2954 int *flags, void *data, struct lustre_handle *lockh)
2956 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2957 struct obd_device *obd = exp->exp_obd;
2959 int lflags = *flags;
2962 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2964 /* Filesystem lock extents are extended to page boundaries so that
2965 * dealing with the page cache is a little smoother */
2966 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2967 policy->l_extent.end |= ~CFS_PAGE_MASK;
2969 /* Next, search for already existing extent locks that will cover us */
2970 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2971 policy, mode, lockh);
2973 //if (!(*flags & LDLM_FL_TEST_LOCK))
2974 osc_set_data_with_check(lockh, data, lflags);
2977 /* If we're trying to read, we also search for an existing PW lock. The
2978 * VFS and page cache already protect us locally, so lots of readers/
2979 * writers can share a single PW lock. */
2980 if (mode == LCK_PR) {
2981 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2983 policy, LCK_PW, lockh);
2984 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2985 /* FIXME: This is not incredibly elegant, but it might
2986 * be more elegant than adding another parameter to
2987 * lock_match. I want a second opinion. */
2988 osc_set_data_with_check(lockh, data, lflags);
2989 ldlm_lock_addref(lockh, LCK_PR);
2990 ldlm_lock_decref(lockh, LCK_PW);
2996 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2997 __u32 mode, struct lustre_handle *lockh)
3001 if (unlikely(mode == LCK_GROUP))
3002 ldlm_lock_decref_and_cancel(lockh, mode);
3004 ldlm_lock_decref(lockh, mode);
3009 static int osc_cancel_unused(struct obd_export *exp,
3010 struct lov_stripe_md *lsm, int flags, void *opaque)
3012 struct obd_device *obd = class_exp2obd(exp);
3013 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3015 return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
3019 static int osc_join_lru(struct obd_export *exp,
3020 struct lov_stripe_md *lsm, int join)
3022 struct obd_device *obd = class_exp2obd(exp);
3023 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3025 return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
3028 static int osc_statfs_interpret(struct ptlrpc_request *req,
3029 struct osc_async_args *aa, int rc)
3031 struct obd_statfs *msfs;
3037 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3038 lustre_swab_obd_statfs);
3040 CERROR("Can't unpack obd_statfs\n");
3041 GOTO(out, rc = -EPROTO);
3044 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3046 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3050 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3051 __u64 max_age, struct ptlrpc_request_set *rqset)
3053 struct ptlrpc_request *req;
3054 struct osc_async_args *aa;
3055 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3058 /* We could possibly pass max_age in the request (as an absolute
3059 * timestamp or a "seconds.usec ago") so the target can avoid doing
3060 * extra calls into the filesystem if that isn't necessary (e.g.
3061 * during mount that would help a bit). Having relative timestamps
3062 * is not so great if request processing is slow, while absolute
3063 * timestamps are not ideal because they need time synchronization. */
3064 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3065 OST_STATFS, 1, NULL, NULL);
3069 ptlrpc_req_set_repsize(req, 2, size);
3070 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3072 req->rq_interpret_reply = osc_statfs_interpret;
3073 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3074 aa = (struct osc_async_args *)&req->rq_async_args;
3077 ptlrpc_set_add_req(rqset, req);
3081 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3084 struct obd_statfs *msfs;
3085 struct ptlrpc_request *req;
3086 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3089 /* We could possibly pass max_age in the request (as an absolute
3090 * timestamp or a "seconds.usec ago") so the target can avoid doing
3091 * extra calls into the filesystem if that isn't necessary (e.g.
3092 * during mount that would help a bit). Having relative timestamps
3093 * is not so great if request processing is slow, while absolute
3094 * timestamps are not ideal because they need time synchronization. */
3095 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3096 OST_STATFS, 1, NULL, NULL);
3100 ptlrpc_req_set_repsize(req, 2, size);
3101 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3103 rc = ptlrpc_queue_wait(req);
3107 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3108 lustre_swab_obd_statfs);
3110 CERROR("Can't unpack obd_statfs\n");
3111 GOTO(out, rc = -EPROTO);
3114 memcpy(osfs, msfs, sizeof(*osfs));
3118 ptlrpc_req_finished(req);
3122 /* Retrieve object striping information.
3124 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3125 * the maximum number of OST indices which will fit in the user buffer.
3126 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3128 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3130 struct lov_user_md lum, *lumk;
3131 int rc = 0, lum_size;
3137 if (copy_from_user(&lum, lump, sizeof(lum)))
3140 if (lum.lmm_magic != LOV_USER_MAGIC)
3143 if (lum.lmm_stripe_count > 0) {
3144 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3145 OBD_ALLOC(lumk, lum_size);
3149 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3151 lum_size = sizeof(lum);
3155 lumk->lmm_object_id = lsm->lsm_object_id;
3156 lumk->lmm_stripe_count = 1;
3158 if (copy_to_user(lump, lumk, lum_size))
3162 OBD_FREE(lumk, lum_size);
3168 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3169 void *karg, void *uarg)
3171 struct obd_device *obd = exp->exp_obd;
3172 struct obd_ioctl_data *data = karg;
3176 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3179 if (!try_module_get(THIS_MODULE)) {
3180 CERROR("Can't get module. Is it alive?");
3185 case OBD_IOC_LOV_GET_CONFIG: {
3187 struct lov_desc *desc;
3188 struct obd_uuid uuid;
3192 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3193 GOTO(out, err = -EINVAL);
3195 data = (struct obd_ioctl_data *)buf;
3197 if (sizeof(*desc) > data->ioc_inllen1) {
3198 obd_ioctl_freedata(buf, len);
3199 GOTO(out, err = -EINVAL);
3202 if (data->ioc_inllen2 < sizeof(uuid)) {
3203 obd_ioctl_freedata(buf, len);
3204 GOTO(out, err = -EINVAL);
3207 desc = (struct lov_desc *)data->ioc_inlbuf1;
3208 desc->ld_tgt_count = 1;
3209 desc->ld_active_tgt_count = 1;
3210 desc->ld_default_stripe_count = 1;
3211 desc->ld_default_stripe_size = 0;
3212 desc->ld_default_stripe_offset = 0;
3213 desc->ld_pattern = 0;
3214 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3216 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3218 err = copy_to_user((void *)uarg, buf, len);
3221 obd_ioctl_freedata(buf, len);
3224 case LL_IOC_LOV_SETSTRIPE:
3225 err = obd_alloc_memmd(exp, karg);
3229 case LL_IOC_LOV_GETSTRIPE:
3230 err = osc_getstripe(karg, uarg);
3232 case OBD_IOC_CLIENT_RECOVER:
3233 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3238 case IOC_OSC_SET_ACTIVE:
3239 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3242 case OBD_IOC_POLL_QUOTACHECK:
3243 err = lquota_poll_check(quota_interface, exp,
3244 (struct if_quotacheck *)karg);
3246 case OBD_IOC_DESTROY: {
3249 if (!capable (CAP_SYS_ADMIN))
3250 GOTO (out, err = -EPERM);
3251 oa = &data->ioc_obdo1;
3252 oa->o_valid |= OBD_MD_FLGROUP;
3254 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3258 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3259 cmd, cfs_curproc_comm());
3260 GOTO(out, err = -ENOTTY);
3263 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3266 module_put(THIS_MODULE);
3271 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3272 void *key, __u32 *vallen, void *val)
3275 if (!vallen || !val)
3278 if (keylen > strlen("lock_to_stripe") &&
3279 strcmp(key, "lock_to_stripe") == 0) {
3280 __u32 *stripe = val;
3281 *vallen = sizeof(*stripe);
3284 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3285 struct ptlrpc_request *req;
3287 char *bufs[2] = { NULL, key };
3288 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3290 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3291 OST_GET_INFO, 2, size, bufs);
3295 size[REPLY_REC_OFF] = *vallen;
3296 ptlrpc_req_set_repsize(req, 2, size);
3297 rc = ptlrpc_queue_wait(req);
3301 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3302 lustre_swab_ost_last_id);
3303 if (reply == NULL) {
3304 CERROR("Can't unpack OST last ID\n");
3305 GOTO(out, rc = -EPROTO);
3307 *((obd_id *)val) = *reply;
3309 ptlrpc_req_finished(req);
3315 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3318 struct llog_ctxt *ctxt;
3319 struct obd_import *imp = req->rq_import;
3325 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3328 rc = llog_initiator_connect(ctxt);
3330 CERROR("cannot establish connection for "
3331 "ctxt %p: %d\n", ctxt, rc);
3334 llog_ctxt_put(ctxt);
3335 spin_lock(&imp->imp_lock);
3336 imp->imp_server_timeout = 1;
3337 imp->imp_pingable = 1;
3338 spin_unlock(&imp->imp_lock);
3339 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3344 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3345 void *key, obd_count vallen, void *val,
3346 struct ptlrpc_request_set *set)
3348 struct ptlrpc_request *req;
3349 struct obd_device *obd = exp->exp_obd;
3350 struct obd_import *imp = class_exp2cliimp(exp);
3351 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3352 char *bufs[3] = { NULL, key, val };
3355 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3357 if (KEY_IS(KEY_NEXT_ID)) {
3358 if (vallen != sizeof(obd_id))
3360 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3361 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3362 exp->exp_obd->obd_name,
3363 obd->u.cli.cl_oscc.oscc_next_id);
3368 if (KEY_IS("unlinked")) {
3369 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3370 spin_lock(&oscc->oscc_lock);
3371 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3372 spin_unlock(&oscc->oscc_lock);
3376 if (KEY_IS(KEY_INIT_RECOV)) {
3377 if (vallen != sizeof(int))
3379 spin_lock(&imp->imp_lock);
3380 imp->imp_initial_recov = *(int *)val;
3381 spin_unlock(&imp->imp_lock);
3382 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3383 exp->exp_obd->obd_name,
3384 imp->imp_initial_recov);
3388 if (KEY_IS("checksum")) {
3389 if (vallen != sizeof(int))
3391 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3398 /* We pass all other commands directly to OST. Since nobody calls osc
3399 methods directly and everybody is supposed to go through LOV, we
3400 assume lov checked invalid values for us.
3401 The only recognised values so far are evict_by_nid and mds_conn.
3402 Even if something bad goes through, we'd get a -EINVAL from OST
3405 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3410 if (KEY_IS(KEY_MDS_CONN))
3411 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3413 ptlrpc_req_set_repsize(req, 1, NULL);
3414 ptlrpc_set_add_req(set, req);
3415 ptlrpc_check_set(set);
3421 static struct llog_operations osc_size_repl_logops = {
3422 lop_cancel: llog_obd_repl_cancel
3425 static struct llog_operations osc_mds_ost_orig_logops;
3426 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3427 int count, struct llog_catid *catid,
3428 struct obd_uuid *uuid)
3433 spin_lock(&obd->obd_dev_lock);
3434 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3435 osc_mds_ost_orig_logops = llog_lvfs_ops;
3436 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3437 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3438 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3439 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3441 spin_unlock(&obd->obd_dev_lock);
3443 rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3444 &catid->lci_logid, &osc_mds_ost_orig_logops);
3446 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3450 rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3451 &osc_size_repl_logops);
3453 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3456 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3457 obd->obd_name, tgt->obd_name, count, catid, rc);
3458 CERROR("logid "LPX64":0x%x\n",
3459 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3464 static int osc_llog_finish(struct obd_device *obd, int count)
3466 struct llog_ctxt *ctxt;
3467 int rc = 0, rc2 = 0;
3470 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3472 rc = llog_cleanup(ctxt);
3474 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3476 rc2 = llog_cleanup(ctxt);
3483 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3484 struct obd_uuid *cluuid,
3485 struct obd_connect_data *data)
3487 struct client_obd *cli = &obd->u.cli;
3489 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3492 client_obd_list_lock(&cli->cl_loi_list_lock);
3493 data->ocd_grant = cli->cl_avail_grant ?:
3494 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3495 lost_grant = cli->cl_lost_grant;
3496 cli->cl_lost_grant = 0;
3497 client_obd_list_unlock(&cli->cl_loi_list_lock);
3499 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3500 "cl_lost_grant: %ld\n", data->ocd_grant,
3501 cli->cl_avail_grant, lost_grant);
3502 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3503 " ocd_grant: %d\n", data->ocd_connect_flags,
3504 data->ocd_version, data->ocd_grant);
3510 static int osc_disconnect(struct obd_export *exp)
3512 struct obd_device *obd = class_exp2obd(exp);
3513 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3516 if (obd->u.cli.cl_conn_count == 1)
3517 /* flush any remaining cancel messages out to the target */
3518 llog_sync(ctxt, exp);
3520 llog_ctxt_put(ctxt);
3522 rc = client_disconnect_export(exp);
3526 static int osc_import_event(struct obd_device *obd,
3527 struct obd_import *imp,
3528 enum obd_import_event event)
3530 struct client_obd *cli;
3534 LASSERT(imp->imp_obd == obd);
3537 case IMP_EVENT_DISCON: {
3538 /* Only do this on the MDS OSC's */
3539 if (imp->imp_server_timeout) {
3540 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3542 spin_lock(&oscc->oscc_lock);
3543 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3544 spin_unlock(&oscc->oscc_lock);
3547 client_obd_list_lock(&cli->cl_loi_list_lock);
3548 cli->cl_avail_grant = 0;
3549 cli->cl_lost_grant = 0;
3550 client_obd_list_unlock(&cli->cl_loi_list_lock);
3551 ptlrpc_import_setasync(imp, -1);
3555 case IMP_EVENT_INACTIVE: {
3556 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3559 case IMP_EVENT_INVALIDATE: {
3560 struct ldlm_namespace *ns = obd->obd_namespace;
3564 client_obd_list_lock(&cli->cl_loi_list_lock);
3565 /* all pages go to failing rpcs due to the invalid import */
3566 osc_check_rpcs(cli);
3567 client_obd_list_unlock(&cli->cl_loi_list_lock);
3569 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3573 case IMP_EVENT_ACTIVE: {
3574 /* Only do this on the MDS OSC's */
3575 if (imp->imp_server_timeout) {
3576 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3578 spin_lock(&oscc->oscc_lock);
3579 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3580 spin_unlock(&oscc->oscc_lock);
3582 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3585 case IMP_EVENT_OCD: {
3586 struct obd_connect_data *ocd = &imp->imp_connect_data;
3588 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3589 osc_init_grant(&obd->u.cli, ocd);
3592 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3593 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3595 ptlrpc_import_setasync(imp, 1);
3596 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3600 CERROR("Unknown import event %d\n", event);
3606 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3612 rc = ptlrpcd_addref();
3616 rc = client_obd_setup(obd, len, buf);
3620 struct lprocfs_static_vars lvars;
3621 struct client_obd *cli = &obd->u.cli;
3623 lprocfs_init_vars(osc, &lvars);
3624 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3625 lproc_osc_attach_seqstat(obd);
3626 ptlrpc_lprocfs_register_obd(obd);
3630 /* We need to allocate a few requests more, because
3631 brw_interpret_oap tries to create new requests before freeing
3632 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3633 reserved, but I afraid that might be too much wasted RAM
3634 in fact, so 2 is just my guess and still should work. */
3635 cli->cl_import->imp_rq_pool =
3636 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3638 ptlrpc_add_rqs_to_pool);
3644 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3650 case OBD_CLEANUP_EARLY: {
3651 struct obd_import *imp;
3652 imp = obd->u.cli.cl_import;
3653 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3654 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3655 ptlrpc_deactivate_import(imp);
3658 case OBD_CLEANUP_EXPORTS: {
3659 /* If we set up but never connected, the
3660 client import will not have been cleaned. */
3661 if (obd->u.cli.cl_import) {
3662 struct obd_import *imp;
3663 imp = obd->u.cli.cl_import;
3664 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3666 ptlrpc_invalidate_import(imp);
3667 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3668 class_destroy_import(imp);
3669 obd->u.cli.cl_import = NULL;
3673 case OBD_CLEANUP_SELF_EXP:
3674 rc = obd_llog_finish(obd, 0);
3676 CERROR("failed to cleanup llogging subsystems\n");
3678 case OBD_CLEANUP_OBD:
3684 int osc_cleanup(struct obd_device *obd)
3686 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3690 ptlrpc_lprocfs_unregister_obd(obd);
3691 lprocfs_obd_cleanup(obd);
3693 spin_lock(&oscc->oscc_lock);
3694 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3695 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3696 spin_unlock(&oscc->oscc_lock);
3698 /* free memory of osc quota cache */
3699 lquota_cleanup(quota_interface, obd);
3701 rc = client_obd_cleanup(obd);
3707 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3709 struct lustre_cfg *lcfg = buf;
3710 struct lprocfs_static_vars lvars;
3713 lprocfs_init_vars(osc, &lvars);
3715 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3719 struct obd_ops osc_obd_ops = {
3720 .o_owner = THIS_MODULE,
3721 .o_setup = osc_setup,
3722 .o_precleanup = osc_precleanup,
3723 .o_cleanup = osc_cleanup,
3724 .o_add_conn = client_import_add_conn,
3725 .o_del_conn = client_import_del_conn,
3726 .o_connect = client_connect_import,
3727 .o_reconnect = osc_reconnect,
3728 .o_disconnect = osc_disconnect,
3729 .o_statfs = osc_statfs,
3730 .o_statfs_async = osc_statfs_async,
3731 .o_packmd = osc_packmd,
3732 .o_unpackmd = osc_unpackmd,
3733 .o_precreate = osc_precreate,
3734 .o_create = osc_create,
3735 .o_destroy = osc_destroy,
3736 .o_getattr = osc_getattr,
3737 .o_getattr_async = osc_getattr_async,
3738 .o_setattr = osc_setattr,
3739 .o_setattr_async = osc_setattr_async,
3741 .o_brw_async = osc_brw_async,
3742 .o_prep_async_page = osc_prep_async_page,
3743 .o_queue_async_io = osc_queue_async_io,
3744 .o_set_async_flags = osc_set_async_flags,
3745 .o_queue_group_io = osc_queue_group_io,
3746 .o_trigger_group_io = osc_trigger_group_io,
3747 .o_teardown_async_page = osc_teardown_async_page,
3748 .o_punch = osc_punch,
3750 .o_enqueue = osc_enqueue,
3751 .o_match = osc_match,
3752 .o_change_cbdata = osc_change_cbdata,
3753 .o_cancel = osc_cancel,
3754 .o_cancel_unused = osc_cancel_unused,
3755 .o_join_lru = osc_join_lru,
3756 .o_iocontrol = osc_iocontrol,
3757 .o_get_info = osc_get_info,
3758 .o_set_info_async = osc_set_info_async,
3759 .o_import_event = osc_import_event,
3760 .o_llog_init = osc_llog_init,
3761 .o_llog_finish = osc_llog_finish,
3762 .o_process_config = osc_process_config,
3764 int __init osc_init(void)
3766 struct lprocfs_static_vars lvars;
3770 lprocfs_init_vars(osc, &lvars);
3772 request_module("lquota");
3773 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3774 lquota_init(quota_interface);
3775 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3777 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3780 if (quota_interface)
3781 PORTAL_SYMBOL_PUT(osc_quota_interface);
3789 static void /*__exit*/ osc_exit(void)
3791 lquota_exit(quota_interface);
3792 if (quota_interface)
3793 PORTAL_SYMBOL_PUT(osc_quota_interface);
3795 class_unregister_type(LUSTRE_OSC_NAME);
3798 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3799 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3800 MODULE_LICENSE("GPL");
3802 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);