1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 atomic_t osc_resend_time;
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 LASSERT(lsm->lsm_object_gr);
95 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
110 if (lmm_bytes < sizeof (*lmm)) {
111 CERROR("lov_mds_md too small: %d, need %d\n",
112 lmm_bytes, (int)sizeof(*lmm));
115 /* XXX LOV_MAGIC etc check? */
117 if (lmm->lmm_object_id == 0) {
118 CERROR("lov_mds_md: zero lmm_object_id\n");
123 lsm_size = lov_stripe_md_size(1);
127 if (*lsmp != NULL && lmm == NULL) {
128 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129 OBD_FREE(*lsmp, lsm_size);
135 OBD_ALLOC(*lsmp, lsm_size);
138 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140 OBD_FREE(*lsmp, lsm_size);
143 loi_init((*lsmp)->lsm_oinfo[0]);
147 /* XXX zero *lsmp? */
148 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150 LASSERT((*lsmp)->lsm_object_id);
151 LASSERT((*lsmp)->lsm_object_gr);
154 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
160 struct ost_body *body, void *capa)
162 struct obd_capa *oc = (struct obd_capa *)capa;
163 struct lustre_capa *c;
168 c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
171 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172 DEBUG_CAPA(D_SEC, c, "pack");
175 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
176 struct obd_info *oinfo)
178 struct ost_body *body;
180 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
181 body->oa = *oinfo->oi_oa;
182 osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
185 static int osc_getattr_interpret(struct ptlrpc_request *req,
186 struct osc_async_args *aa, int rc)
188 struct ost_body *body;
194 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
195 lustre_swab_ost_body);
197 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
200 /* This should really be sent by the OST */
201 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
202 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204 CERROR("can't unpack ost_body\n");
206 aa->aa_oi->oi_oa->o_valid = 0;
209 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
213 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
214 struct ptlrpc_request_set *set)
216 struct ptlrpc_request *req;
217 struct ost_body *body;
218 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219 struct osc_async_args *aa;
222 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
223 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224 OST_GETATTR, 3, size,NULL);
228 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
230 ptlrpc_req_set_repsize(req, 2, size);
231 req->rq_interpret_reply = osc_getattr_interpret;
233 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
234 aa = (struct osc_async_args *)&req->rq_async_args;
237 ptlrpc_set_add_req(set, req);
241 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
243 struct ptlrpc_request *req;
244 struct ost_body *body;
245 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
248 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
249 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
250 OST_GETATTR, 3, size, NULL);
254 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
256 ptlrpc_req_set_repsize(req, 2, size);
258 rc = ptlrpc_queue_wait(req);
260 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
264 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
265 lustre_swab_ost_body);
267 CERROR ("can't unpack ost_body\n");
268 GOTO (out, rc = -EPROTO);
271 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
272 *oinfo->oi_oa = body->oa;
274 /* This should really be sent by the OST */
275 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
276 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
280 ptlrpc_req_finished(req);
284 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
285 struct obd_trans_info *oti)
287 struct ptlrpc_request *req;
288 struct ost_body *body;
289 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
292 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
293 oinfo->oi_oa->o_gr > 0);
294 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
295 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
296 OST_SETATTR, 3, size, NULL);
300 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
302 ptlrpc_req_set_repsize(req, 2, size);
304 rc = ptlrpc_queue_wait(req);
308 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
309 lustre_swab_ost_body);
311 GOTO(out, rc = -EPROTO);
313 *oinfo->oi_oa = body->oa;
317 ptlrpc_req_finished(req);
321 static int osc_setattr_interpret(struct ptlrpc_request *req,
322 struct osc_async_args *aa, int rc)
324 struct ost_body *body;
330 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
331 lustre_swab_ost_body);
333 CERROR("can't unpack ost_body\n");
334 GOTO(out, rc = -EPROTO);
337 *aa->aa_oi->oi_oa = body->oa;
339 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
343 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
344 struct obd_trans_info *oti,
345 struct ptlrpc_request_set *rqset)
347 struct ptlrpc_request *req;
348 int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
349 struct osc_async_args *aa;
352 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
353 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
354 OST_SETATTR, 3, size, NULL);
358 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
359 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
361 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
364 ptlrpc_req_set_repsize(req, 2, size);
365 /* do mds to ost setattr asynchronouly */
367 /* Do not wait for response. */
368 ptlrpcd_add_req(req);
370 req->rq_interpret_reply = osc_setattr_interpret;
372 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
373 aa = (struct osc_async_args *)&req->rq_async_args;
376 ptlrpc_set_add_req(rqset, req);
382 int osc_real_create(struct obd_export *exp, struct obdo *oa,
383 struct lov_stripe_md **ea, struct obd_trans_info *oti)
385 struct ptlrpc_request *req;
386 struct ost_body *body;
387 struct lov_stripe_md *lsm;
388 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
396 rc = obd_alloc_memmd(exp, &lsm);
401 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
402 OST_CREATE, 2, size, NULL);
404 GOTO(out, rc = -ENOMEM);
406 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
409 ptlrpc_req_set_repsize(req, 2, size);
410 if (oa->o_valid & OBD_MD_FLINLINE) {
411 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
412 oa->o_flags == OBD_FL_DELORPHAN);
414 "delorphan from OST integration");
415 /* Don't resend the delorphan req */
416 req->rq_no_resend = req->rq_no_delay = 1;
419 rc = ptlrpc_queue_wait(req);
423 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
424 lustre_swab_ost_body);
426 CERROR ("can't unpack ost_body\n");
427 GOTO (out_req, rc = -EPROTO);
432 /* This should really be sent by the OST */
433 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
434 oa->o_valid |= OBD_MD_FLBLKSZ;
436 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
437 * have valid lsm_oinfo data structs, so don't go touching that.
438 * This needs to be fixed in a big way.
440 lsm->lsm_object_id = oa->o_id;
441 lsm->lsm_object_gr = oa->o_gr;
445 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
447 if (oa->o_valid & OBD_MD_FLCOOKIE) {
448 if (!oti->oti_logcookies)
449 oti_alloc_cookies(oti, 1);
450 *oti->oti_logcookies = *obdo_logcookie(oa);
454 CDEBUG(D_HA, "transno: "LPD64"\n",
455 lustre_msg_get_transno(req->rq_repmsg));
458 ptlrpc_req_finished(req);
461 obd_free_memmd(exp, &lsm);
465 static int osc_punch_interpret(struct ptlrpc_request *req,
466 struct osc_async_args *aa, int rc)
468 struct ost_body *body;
474 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
475 lustre_swab_ost_body);
477 CERROR ("can't unpack ost_body\n");
478 GOTO(out, rc = -EPROTO);
481 *aa->aa_oi->oi_oa = body->oa;
483 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
487 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
488 struct obd_trans_info *oti,
489 struct ptlrpc_request_set *rqset)
491 struct ptlrpc_request *req;
492 struct osc_async_args *aa;
493 struct ost_body *body;
494 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
502 size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
503 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
504 OST_PUNCH, 3, size, NULL);
508 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
510 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
511 /* overload the size and blocks fields in the oa with start/end */
512 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
513 body->oa.o_size = oinfo->oi_policy.l_extent.start;
514 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
515 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
517 ptlrpc_req_set_repsize(req, 2, size);
519 req->rq_interpret_reply = osc_punch_interpret;
520 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
521 aa = (struct osc_async_args *)&req->rq_async_args;
523 ptlrpc_set_add_req(rqset, req);
528 static int osc_sync(struct obd_export *exp, struct obdo *oa,
529 struct lov_stripe_md *md, obd_size start, obd_size end,
532 struct ptlrpc_request *req;
533 struct ost_body *body;
534 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
542 size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
544 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
545 OST_SYNC, 3, size, NULL);
549 /* overload the size and blocks fields in the oa with start/end */
550 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
552 body->oa.o_size = start;
553 body->oa.o_blocks = end;
554 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
556 osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
558 ptlrpc_req_set_repsize(req, 2, size);
560 rc = ptlrpc_queue_wait(req);
564 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
565 lustre_swab_ost_body);
567 CERROR ("can't unpack ost_body\n");
568 GOTO (out, rc = -EPROTO);
575 ptlrpc_req_finished(req);
579 /* Find and cancel locally locks matched by @mode in the resource found by
580 * @objid. Found locks are added into @cancel list. Returns the amount of
581 * locks added to @cancels list. */
582 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
583 struct list_head *cancels, ldlm_mode_t mode,
586 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
587 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
588 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
595 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
596 lock_flags, 0, NULL);
597 ldlm_resource_putref(res);
601 /* Destroy requests can be async always on the client, and we don't even really
602 * care about the return code since the client cannot do anything at all about
604 * When the MDS is unlinking a filename, it saves the file objects into a
605 * recovery llog, and these object records are cancelled when the OST reports
606 * they were destroyed and sync'd to disk (i.e. transaction committed).
607 * If the client dies, or the OST is down when the object should be destroyed,
608 * the records are not cancelled, and when the OST reconnects to the MDS next,
609 * it will retrieve the llog unlink logs and then sends the log cancellation
610 * cookies to the MDS after committing destroy transactions. */
611 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
612 struct lov_stripe_md *ea, struct obd_trans_info *oti,
613 struct obd_export *md_export)
615 CFS_LIST_HEAD(cancels);
616 struct ptlrpc_request *req;
617 struct ost_body *body;
618 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
619 int count, bufcount = 2;
627 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
628 LDLM_FL_DISCARD_DATA);
629 if (exp_connect_cancelset(exp) && count) {
631 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
633 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
634 OST_DESTROY, bufcount, size, NULL);
635 if (exp_connect_cancelset(exp) && req)
636 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
638 ldlm_lock_list_put(&cancels, l_bl_ast, count);
643 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
645 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
646 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
647 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
648 sizeof(*oti->oti_logcookies));
651 ptlrpc_req_set_repsize(req, 2, size);
653 ptlrpcd_add_req(req);
657 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
660 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
662 LASSERT(!(oa->o_valid & bits));
665 client_obd_list_lock(&cli->cl_loi_list_lock);
666 oa->o_dirty = cli->cl_dirty;
667 if (cli->cl_dirty > cli->cl_dirty_max) {
668 CERROR("dirty %lu > dirty_max %lu\n",
669 cli->cl_dirty, cli->cl_dirty_max);
671 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
672 CERROR("dirty %d > system dirty_max %d\n",
673 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
675 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
676 CERROR("dirty %lu - dirty_max %lu too big???\n",
677 cli->cl_dirty, cli->cl_dirty_max);
680 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
681 (cli->cl_max_rpcs_in_flight + 1);
682 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
684 oa->o_grant = cli->cl_avail_grant;
685 oa->o_dropped = cli->cl_lost_grant;
686 cli->cl_lost_grant = 0;
687 client_obd_list_unlock(&cli->cl_loi_list_lock);
688 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
689 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
692 /* caller must hold loi_list_lock */
693 static void osc_consume_write_grant(struct client_obd *cli,
694 struct brw_page *pga)
696 atomic_inc(&obd_dirty_pages);
697 cli->cl_dirty += CFS_PAGE_SIZE;
698 cli->cl_avail_grant -= CFS_PAGE_SIZE;
699 pga->flag |= OBD_BRW_FROM_GRANT;
700 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
701 CFS_PAGE_SIZE, pga, pga->pg);
702 LASSERT(cli->cl_avail_grant >= 0);
705 /* the companion to osc_consume_write_grant, called when a brw has completed.
706 * must be called with the loi lock held. */
707 static void osc_release_write_grant(struct client_obd *cli,
708 struct brw_page *pga, int sent)
710 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
713 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
718 pga->flag &= ~OBD_BRW_FROM_GRANT;
719 atomic_dec(&obd_dirty_pages);
720 cli->cl_dirty -= CFS_PAGE_SIZE;
722 cli->cl_lost_grant += CFS_PAGE_SIZE;
723 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
724 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
725 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
726 /* For short writes we shouldn't count parts of pages that
727 * span a whole block on the OST side, or our accounting goes
728 * wrong. Should match the code in filter_grant_check. */
729 int offset = pga->off & ~CFS_PAGE_MASK;
730 int count = pga->count + (offset & (blocksize - 1));
731 int end = (offset + pga->count) & (blocksize - 1);
733 count += blocksize - end;
735 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
736 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
737 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
738 cli->cl_avail_grant, cli->cl_dirty);
744 static unsigned long rpcs_in_flight(struct client_obd *cli)
746 return cli->cl_r_in_flight + cli->cl_w_in_flight;
749 /* caller must hold loi_list_lock */
750 void osc_wake_cache_waiters(struct client_obd *cli)
752 struct list_head *l, *tmp;
753 struct osc_cache_waiter *ocw;
756 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
757 /* if we can't dirty more, we must wait until some is written */
758 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
759 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
760 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
761 "osc max %ld, sys max %d\n", cli->cl_dirty,
762 cli->cl_dirty_max, obd_max_dirty_pages);
766 /* if still dirty cache but no grant wait for pending RPCs that
767 * may yet return us some grant before doing sync writes */
768 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
769 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
770 cli->cl_w_in_flight);
774 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
775 list_del_init(&ocw->ocw_entry);
776 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
777 /* no more RPCs in flight to return grant, do sync IO */
778 ocw->ocw_rc = -EDQUOT;
779 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
781 osc_consume_write_grant(cli,
782 &ocw->ocw_oap->oap_brw_page);
785 cfs_waitq_signal(&ocw->ocw_waitq);
791 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
793 client_obd_list_lock(&cli->cl_loi_list_lock);
794 cli->cl_avail_grant = ocd->ocd_grant;
795 client_obd_list_unlock(&cli->cl_loi_list_lock);
797 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
798 cli->cl_avail_grant, cli->cl_lost_grant);
799 LASSERT(cli->cl_avail_grant >= 0);
802 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
804 client_obd_list_lock(&cli->cl_loi_list_lock);
805 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
806 if (body->oa.o_valid & OBD_MD_FLGRANT)
807 cli->cl_avail_grant += body->oa.o_grant;
808 /* waiters are woken in brw_interpret_oap */
809 client_obd_list_unlock(&cli->cl_loi_list_lock);
812 /* We assume that the reason this OSC got a short read is because it read
813 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
814 * via the LOV, and it _knows_ it's reading inside the file, it's just that
815 * this stripe never got written at or beyond this stripe offset yet. */
816 static void handle_short_read(int nob_read, obd_count page_count,
817 struct brw_page **pga)
822 /* skip bytes read OK */
823 while (nob_read > 0) {
824 LASSERT (page_count > 0);
826 if (pga[i]->count > nob_read) {
827 /* EOF inside this page */
828 ptr = cfs_kmap(pga[i]->pg) +
829 (pga[i]->off & ~CFS_PAGE_MASK);
830 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
831 cfs_kunmap(pga[i]->pg);
837 nob_read -= pga[i]->count;
842 /* zero remaining pages */
843 while (page_count-- > 0) {
844 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
845 memset(ptr, 0, pga[i]->count);
846 cfs_kunmap(pga[i]->pg);
851 static int check_write_rcs(struct ptlrpc_request *req,
852 int requested_nob, int niocount,
853 obd_count page_count, struct brw_page **pga)
857 /* return error if any niobuf was in error */
858 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
859 sizeof(*remote_rcs) * niocount, NULL);
860 if (remote_rcs == NULL) {
861 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
864 if (lustre_msg_swabbed(req->rq_repmsg))
865 for (i = 0; i < niocount; i++)
866 __swab32s(&remote_rcs[i]);
868 for (i = 0; i < niocount; i++) {
869 if (remote_rcs[i] < 0)
870 return(remote_rcs[i]);
872 if (remote_rcs[i] != 0) {
873 CERROR("rc[%d] invalid (%d) req %p\n",
874 i, remote_rcs[i], req);
879 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
880 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
881 requested_nob, req->rq_bulk->bd_nob_transferred);
888 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
890 if (p1->flag != p2->flag) {
891 unsigned mask = ~OBD_BRW_FROM_GRANT;
893 /* warn if we try to combine flags that we don't know to be
895 if ((p1->flag & mask) != (p2->flag & mask))
896 CERROR("is it ok to have flags 0x%x and 0x%x in the "
897 "same brw?\n", p1->flag, p2->flag);
901 return (p1->off + p1->count == p2->off);
904 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
905 struct brw_page **pga)
910 LASSERT (pg_count > 0);
911 while (nob > 0 && pg_count > 0) {
912 char *ptr = cfs_kmap(pga[i]->pg);
913 int off = pga[i]->off & ~CFS_PAGE_MASK;
914 int count = pga[i]->count > nob ? nob : pga[i]->count;
916 /* corrupt the data before we compute the checksum, to
917 * simulate an OST->client data error */
919 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
920 memcpy(ptr + off, "bad1", min(4, nob));
921 cksum = crc32_le(cksum, ptr + off, count);
922 cfs_kunmap(pga[i]->pg);
923 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
926 nob -= pga[i]->count;
930 /* For sending we only compute the wrong checksum instead
931 * of corrupting the data so it is still correct on a redo */
932 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
938 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
939 struct lov_stripe_md *lsm, obd_count page_count,
940 struct brw_page **pga,
941 struct ptlrpc_request **reqp,
942 struct obd_capa *ocapa)
944 struct ptlrpc_request *req;
945 struct ptlrpc_bulk_desc *desc;
946 struct ost_body *body;
947 struct obd_ioobj *ioobj;
948 struct niobuf_remote *niobuf;
949 int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
950 int niocount, i, requested_nob, opc, rc;
951 struct ptlrpc_request_pool *pool;
952 struct lustre_capa *capa;
953 struct osc_brw_async_args *aa;
956 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
957 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
959 if ((cmd & OBD_BRW_WRITE) != 0) {
961 pool = cli->cl_import->imp_rq_pool;
967 for (niocount = i = 1; i < page_count; i++) {
968 if (!can_merge_pages(pga[i - 1], pga[i]))
972 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
973 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
975 size[REQ_REC_OFF + 3] = sizeof(*capa);
977 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
978 size, NULL, pool, NULL);
982 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
984 if (opc == OST_WRITE)
985 desc = ptlrpc_prep_bulk_imp (req, page_count,
986 BULK_GET_SOURCE, OST_BULK_PORTAL);
988 desc = ptlrpc_prep_bulk_imp (req, page_count,
989 BULK_PUT_SINK, OST_BULK_PORTAL);
991 GOTO(out, rc = -ENOMEM);
992 /* NB request now owns desc and will free it when it gets freed */
994 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
995 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
996 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
997 niocount * sizeof(*niobuf));
1001 obdo_to_ioobj(oa, ioobj);
1002 ioobj->ioo_bufcnt = niocount;
1004 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1006 capa_cpy(capa, ocapa);
1007 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1010 LASSERT (page_count > 0);
1011 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1012 struct brw_page *pg = pga[i];
1013 struct brw_page *pg_prev = pga[i - 1];
1015 LASSERT(pg->count > 0);
1016 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1017 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1018 pg->off, pg->count);
1020 LASSERTF(i == 0 || pg->off > pg_prev->off,
1021 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1022 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1024 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1025 pg_prev->pg, page_private(pg_prev->pg),
1026 pg_prev->pg->index, pg_prev->off);
1028 LASSERTF(i == 0 || pg->off > pg_prev->off,
1029 "i %d p_c %u\n", i, page_count);
1031 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1032 (pg->flag & OBD_BRW_SRVLOCK));
1034 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1036 requested_nob += pg->count;
1038 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1040 niobuf->len += pg->count;
1042 niobuf->offset = pg->off;
1043 niobuf->len = pg->count;
1044 niobuf->flags = pg->flag;
1048 LASSERT((void *)(niobuf - niocount) ==
1049 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1050 niocount * sizeof(*niobuf)));
1051 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1053 /* size[REQ_REC_OFF] still sizeof (*body) */
1054 if (opc == OST_WRITE) {
1055 if (unlikely(cli->cl_checksum)) {
1056 body->oa.o_valid |= OBD_MD_FLCKSUM;
1057 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1059 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1061 /* save this in 'oa', too, for later checking */
1062 oa->o_valid |= OBD_MD_FLCKSUM;
1064 /* clear out the checksum flag, in case this is a
1065 * resend but cl_checksum is no longer set. b=11238 */
1066 oa->o_valid &= ~OBD_MD_FLCKSUM;
1068 oa->o_cksum = body->oa.o_cksum;
1069 /* 1 RC per niobuf */
1070 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1071 ptlrpc_req_set_repsize(req, 3, size);
1073 if (unlikely(cli->cl_checksum))
1074 body->oa.o_valid |= OBD_MD_FLCKSUM;
1075 /* 1 RC for the whole I/O */
1076 ptlrpc_req_set_repsize(req, 2, size);
1079 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1080 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1082 aa->aa_requested_nob = requested_nob;
1083 aa->aa_nio_count = niocount;
1084 aa->aa_page_count = page_count;
1088 INIT_LIST_HEAD(&aa->aa_oaps);
1094 ptlrpc_req_finished (req);
1098 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1099 __u32 client_cksum, __u32 server_cksum,
1100 int nob, obd_count page_count,
1101 struct brw_page **pga)
1106 if (server_cksum == client_cksum) {
1107 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1111 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1113 if (new_cksum == server_cksum)
1114 msg = "changed on the client after we checksummed it - "
1115 "likely false positive due to mmap IO (bug 11742)";
1116 else if (new_cksum == client_cksum)
1117 msg = "changed in transit before arrival at OST";
1119 msg = "changed in transit AND doesn't match the original - "
1120 "likely false positive due to mmap IO (bug 11742)";
1122 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1123 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1124 "["LPU64"-"LPU64"]\n",
1125 msg, libcfs_nid2str(peer->nid),
1126 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1127 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1130 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1132 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1133 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1134 client_cksum, server_cksum, new_cksum);
1138 /* Note rc enters this function as number of bytes transferred */
1139 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1141 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1142 const lnet_process_id_t *peer =
1143 &req->rq_import->imp_connection->c_peer;
1144 struct client_obd *cli = aa->aa_cli;
1145 struct ost_body *body;
1146 __u32 client_cksum = 0;
1149 if (rc < 0 && rc != -EDQUOT)
1152 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1153 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1154 lustre_swab_ost_body);
1156 CERROR ("Can't unpack body\n");
1160 /* set/clear over quota flag for a uid/gid */
1161 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1162 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1163 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1164 body->oa.o_gid, body->oa.o_valid,
1170 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1171 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1173 osc_update_grant(cli, body);
1175 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1177 CERROR ("Unexpected +ve rc %d\n", rc);
1180 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1182 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1184 check_write_checksum(&body->oa, peer, client_cksum,
1186 aa->aa_requested_nob,
1191 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1194 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1195 aa->aa_page_count, aa->aa_ppga);
1199 /* The rest of this function executes only for OST_READs */
1200 if (rc > aa->aa_requested_nob) {
1201 CERROR("Unexpected rc %d (%d requested)\n", rc,
1202 aa->aa_requested_nob);
1206 if (rc != req->rq_bulk->bd_nob_transferred) {
1207 CERROR ("Unexpected rc %d (%d transferred)\n",
1208 rc, req->rq_bulk->bd_nob_transferred);
1212 if (rc < aa->aa_requested_nob)
1213 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1215 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1217 GOTO(out, rc = -EAGAIN);
1219 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1220 static int cksum_counter;
1221 __u32 server_cksum = body->oa.o_cksum;
1225 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1228 if (peer->nid == req->rq_bulk->bd_sender) {
1232 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1235 if (server_cksum == ~0 && rc > 0) {
1236 CERROR("Protocol error: server %s set the 'checksum' "
1237 "bit, but didn't send a checksum. Not fatal, "
1238 "but please tell CFS.\n",
1239 libcfs_nid2str(peer->nid));
1240 } else if (server_cksum != client_cksum) {
1241 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1242 "%s%s%s inum "LPU64"/"LPU64" object "
1243 LPU64"/"LPU64" extent "
1244 "["LPU64"-"LPU64"]\n",
1245 req->rq_import->imp_obd->obd_name,
1246 libcfs_nid2str(peer->nid),
1248 body->oa.o_valid & OBD_MD_FLFID ?
1249 body->oa.o_fid : (__u64)0,
1250 body->oa.o_valid & OBD_MD_FLFID ?
1251 body->oa.o_generation :(__u64)0,
1253 body->oa.o_valid & OBD_MD_FLGROUP ?
1254 body->oa.o_gr : (__u64)0,
1255 aa->aa_ppga[0]->off,
1256 aa->aa_ppga[aa->aa_page_count-1]->off +
1257 aa->aa_ppga[aa->aa_page_count-1]->count -
1259 CERROR("client %x, server %x\n",
1260 client_cksum, server_cksum);
1262 aa->aa_oa->o_cksum = client_cksum;
1266 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1269 } else if (unlikely(client_cksum)) {
1270 static int cksum_missed;
1273 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1274 CERROR("Checksum %u requested from %s but not sent\n",
1275 cksum_missed, libcfs_nid2str(peer->nid));
1281 *aa->aa_oa = body->oa;
1286 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1287 struct lov_stripe_md *lsm,
1288 obd_count page_count, struct brw_page **pga,
1289 struct obd_capa *ocapa)
1291 struct ptlrpc_request *req;
1295 struct l_wait_info lwi;
1299 cfs_waitq_init(&waitq);
1302 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1303 page_count, pga, &req, ocapa);
1307 rc = ptlrpc_queue_wait(req);
1309 if (rc == -ETIMEDOUT && req->rq_resend) {
1310 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1311 ptlrpc_req_finished(req);
1315 rc = osc_brw_fini_request(req, rc);
1317 ptlrpc_req_finished(req);
1318 if (osc_recoverable_error(rc)) {
1320 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1321 CERROR("too many resend retries, returning error\n");
1325 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1326 l_wait_event(waitq, 0, &lwi);
1334 int osc_brw_redo_request(struct ptlrpc_request *request,
1335 struct osc_brw_async_args *aa)
1337 struct ptlrpc_request *new_req;
1338 struct ptlrpc_request_set *set = request->rq_set;
1339 struct osc_brw_async_args *new_aa;
1340 struct osc_async_page *oap;
1344 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1345 CERROR("too many resend retries, returning error\n");
1349 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1351 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1352 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1353 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1356 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1357 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1358 aa->aa_cli, aa->aa_oa,
1359 NULL /* lsm unused by osc currently */,
1360 aa->aa_page_count, aa->aa_ppga,
1361 &new_req, NULL /* ocapa */);
1365 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1367 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1368 if (oap->oap_request != NULL) {
1369 LASSERTF(request == oap->oap_request,
1370 "request %p != oap_request %p\n",
1371 request, oap->oap_request);
1372 if (oap->oap_interrupted) {
1373 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1374 ptlrpc_req_finished(new_req);
1379 /* New request takes over pga and oaps from old request.
1380 * Note that copying a list_head doesn't work, need to move it... */
1382 new_req->rq_interpret_reply = request->rq_interpret_reply;
1383 new_req->rq_async_args = request->rq_async_args;
1384 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1386 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1388 INIT_LIST_HEAD(&new_aa->aa_oaps);
1389 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1390 INIT_LIST_HEAD(&aa->aa_oaps);
1392 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1393 if (oap->oap_request) {
1394 ptlrpc_req_finished(oap->oap_request);
1395 oap->oap_request = ptlrpc_request_addref(new_req);
1398 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1400 DEBUG_REQ(D_INFO, new_req, "new request");
1402 ptlrpc_set_add_req(set, new_req);
1407 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1409 struct osc_brw_async_args *aa = data;
1414 rc = osc_brw_fini_request(req, rc);
1415 if (osc_recoverable_error(rc)) {
1416 rc = osc_brw_redo_request(req, aa);
1420 if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1421 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1423 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1424 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1425 aa->aa_cli->cl_w_in_flight--;
1427 aa->aa_cli->cl_r_in_flight--;
1428 for (i = 0; i < aa->aa_page_count; i++)
1429 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1430 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1432 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1437 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1438 struct lov_stripe_md *lsm, obd_count page_count,
1439 struct brw_page **pga, struct ptlrpc_request_set *set,
1440 struct obd_capa *ocapa)
1442 struct ptlrpc_request *req;
1443 struct client_obd *cli = &exp->exp_obd->u.cli;
1445 struct osc_brw_async_args *aa;
1448 /* Consume write credits even if doing a sync write -
1449 * otherwise we may run out of space on OST due to grant. */
1450 if (cmd == OBD_BRW_WRITE) {
1451 spin_lock(&cli->cl_loi_list_lock);
1452 for (i = 0; i < page_count; i++) {
1453 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1454 osc_consume_write_grant(cli, pga[i]);
1456 spin_unlock(&cli->cl_loi_list_lock);
1459 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1462 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1463 if (cmd == OBD_BRW_READ) {
1464 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1465 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1466 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1468 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1469 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1470 cli->cl_w_in_flight);
1471 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1475 req->rq_interpret_reply = brw_interpret;
1476 ptlrpc_set_add_req(set, req);
1477 client_obd_list_lock(&cli->cl_loi_list_lock);
1478 if (cmd == OBD_BRW_READ)
1479 cli->cl_r_in_flight++;
1481 cli->cl_w_in_flight++;
1482 client_obd_list_unlock(&cli->cl_loi_list_lock);
1483 } else if (cmd == OBD_BRW_WRITE) {
1484 client_obd_list_lock(&cli->cl_loi_list_lock);
1485 for (i = 0; i < page_count; i++)
1486 osc_release_write_grant(cli, pga[i], 0);
1487 client_obd_list_unlock(&cli->cl_loi_list_lock);
1493 * ugh, we want disk allocation on the target to happen in offset order. we'll
1494 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1495 * fine for our small page arrays and doesn't require allocation. its an
1496 * insertion sort that swaps elements that are strides apart, shrinking the
1497 * stride down until its '1' and the array is sorted.
1499 static void sort_brw_pages(struct brw_page **array, int num)
1502 struct brw_page *tmp;
1506 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1511 for (i = stride ; i < num ; i++) {
1514 while (j >= stride && array[j - stride]->off > tmp->off) {
1515 array[j] = array[j - stride];
1520 } while (stride > 1);
1523 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1529 LASSERT (pages > 0);
1530 offset = pg[i]->off & ~CFS_PAGE_MASK;
1534 if (pages == 0) /* that's all */
1537 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1538 return count; /* doesn't end on page boundary */
1541 offset = pg[i]->off & ~CFS_PAGE_MASK;
1542 if (offset != 0) /* doesn't start on page boundary */
1549 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1551 struct brw_page **ppga;
1554 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1558 for (i = 0; i < count; i++)
1563 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1565 LASSERT(ppga != NULL);
1566 OBD_FREE(ppga, sizeof(*ppga) * count);
1569 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1570 obd_count page_count, struct brw_page *pga,
1571 struct obd_trans_info *oti)
1573 struct obdo *saved_oa = NULL;
1574 struct brw_page **ppga, **orig;
1575 struct obd_import *imp = class_exp2cliimp(exp);
1576 struct client_obd *cli = &imp->imp_obd->u.cli;
1577 int rc, page_count_orig;
1580 if (cmd & OBD_BRW_CHECK) {
1581 /* The caller just wants to know if there's a chance that this
1582 * I/O can succeed */
1584 if (imp == NULL || imp->imp_invalid)
1589 /* test_brw with a failed create can trip this, maybe others. */
1590 LASSERT(cli->cl_max_pages_per_rpc);
1594 orig = ppga = osc_build_ppga(pga, page_count);
1597 page_count_orig = page_count;
1599 sort_brw_pages(ppga, page_count);
1600 while (page_count) {
1601 obd_count pages_per_brw;
1603 if (page_count > cli->cl_max_pages_per_rpc)
1604 pages_per_brw = cli->cl_max_pages_per_rpc;
1606 pages_per_brw = page_count;
1608 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1610 if (saved_oa != NULL) {
1611 /* restore previously saved oa */
1612 *oinfo->oi_oa = *saved_oa;
1613 } else if (page_count > pages_per_brw) {
1614 /* save a copy of oa (brw will clobber it) */
1615 OBDO_ALLOC(saved_oa);
1616 if (saved_oa == NULL)
1617 GOTO(out, rc = -ENOMEM);
1618 *saved_oa = *oinfo->oi_oa;
1621 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1622 pages_per_brw, ppga, oinfo->oi_capa);
1627 page_count -= pages_per_brw;
1628 ppga += pages_per_brw;
1632 osc_release_ppga(orig, page_count_orig);
1634 if (saved_oa != NULL)
1635 OBDO_FREE(saved_oa);
1640 static int osc_brw_async(int cmd, struct obd_export *exp,
1641 struct obd_info *oinfo, obd_count page_count,
1642 struct brw_page *pga, struct obd_trans_info *oti,
1643 struct ptlrpc_request_set *set)
1645 struct brw_page **ppga, **orig;
1646 struct client_obd *cli = &exp->exp_obd->u.cli;
1647 int page_count_orig;
1651 if (cmd & OBD_BRW_CHECK) {
1652 struct obd_import *imp = class_exp2cliimp(exp);
1653 /* The caller just wants to know if there's a chance that this
1654 * I/O can succeed */
1656 if (imp == NULL || imp->imp_invalid)
1661 orig = ppga = osc_build_ppga(pga, page_count);
1664 page_count_orig = page_count;
1666 sort_brw_pages(ppga, page_count);
1667 while (page_count) {
1668 struct brw_page **copy;
1669 obd_count pages_per_brw;
1671 pages_per_brw = min_t(obd_count, page_count,
1672 cli->cl_max_pages_per_rpc);
1674 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1676 /* use ppga only if single RPC is going to fly */
1677 if (pages_per_brw != page_count_orig || ppga != orig) {
1678 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1680 GOTO(out, rc = -ENOMEM);
1681 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1685 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1686 pages_per_brw, copy, set, oinfo->oi_capa);
1690 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1694 /* we passed it to async_internal() which is
1695 * now responsible for releasing memory */
1699 page_count -= pages_per_brw;
1700 ppga += pages_per_brw;
1704 osc_release_ppga(orig, page_count_orig);
1708 static void osc_check_rpcs(struct client_obd *cli);
1710 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1711 * the dirty accounting. Writeback completes or truncate happens before
1712 * writing starts. Must be called with the loi lock held. */
1713 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1716 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1720 /* This maintains the lists of pending pages to read/write for a given object
1721 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1722 * to quickly find objects that are ready to send an RPC. */
1723 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1729 if (lop->lop_num_pending == 0)
1732 /* if we have an invalid import we want to drain the queued pages
1733 * by forcing them through rpcs that immediately fail and complete
1734 * the pages. recovery relies on this to empty the queued pages
1735 * before canceling the locks and evicting down the llite pages */
1736 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1739 /* stream rpcs in queue order as long as as there is an urgent page
1740 * queued. this is our cheap solution for good batching in the case
1741 * where writepage marks some random page in the middle of the file
1742 * as urgent because of, say, memory pressure */
1743 if (!list_empty(&lop->lop_urgent)) {
1744 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1747 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1748 optimal = cli->cl_max_pages_per_rpc;
1749 if (cmd & OBD_BRW_WRITE) {
1750 /* trigger a write rpc stream as long as there are dirtiers
1751 * waiting for space. as they're waiting, they're not going to
1752 * create more pages to coallesce with what's waiting.. */
1753 if (!list_empty(&cli->cl_cache_waiters)) {
1754 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1757 /* +16 to avoid triggering rpcs that would want to include pages
1758 * that are being queued but which can't be made ready until
1759 * the queuer finishes with the page. this is a wart for
1760 * llite::commit_write() */
1763 if (lop->lop_num_pending >= optimal)
1769 static void on_list(struct list_head *item, struct list_head *list,
1772 if (list_empty(item) && should_be_on)
1773 list_add_tail(item, list);
1774 else if (!list_empty(item) && !should_be_on)
1775 list_del_init(item);
1778 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1779 * can find pages to build into rpcs quickly */
1780 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1782 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1783 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1784 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1786 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1787 loi->loi_write_lop.lop_num_pending);
1789 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1790 loi->loi_read_lop.lop_num_pending);
1793 static void lop_update_pending(struct client_obd *cli,
1794 struct loi_oap_pages *lop, int cmd, int delta)
1796 lop->lop_num_pending += delta;
1797 if (cmd & OBD_BRW_WRITE)
1798 cli->cl_pending_w_pages += delta;
1800 cli->cl_pending_r_pages += delta;
1803 /* this is called when a sync waiter receives an interruption. Its job is to
1804 * get the caller woken as soon as possible. If its page hasn't been put in an
1805 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1806 * desiring interruption which will forcefully complete the rpc once the rpc
1808 static void osc_occ_interrupted(struct oig_callback_context *occ)
1810 struct osc_async_page *oap;
1811 struct loi_oap_pages *lop;
1812 struct lov_oinfo *loi;
1815 /* XXX member_of() */
1816 oap = list_entry(occ, struct osc_async_page, oap_occ);
1818 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1820 oap->oap_interrupted = 1;
1822 /* ok, it's been put in an rpc. only one oap gets a request reference */
1823 if (oap->oap_request != NULL) {
1824 ptlrpc_mark_interrupted(oap->oap_request);
1825 ptlrpcd_wake(oap->oap_request);
1829 /* we don't get interruption callbacks until osc_trigger_group_io()
1830 * has been called and put the sync oaps in the pending/urgent lists.*/
1831 if (!list_empty(&oap->oap_pending_item)) {
1832 list_del_init(&oap->oap_pending_item);
1833 list_del_init(&oap->oap_urgent_item);
1836 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1837 &loi->loi_write_lop : &loi->loi_read_lop;
1838 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1839 loi_list_maint(oap->oap_cli, oap->oap_loi);
1841 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1842 oap->oap_oig = NULL;
1846 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1849 /* this is trying to propogate async writeback errors back up to the
1850 * application. As an async write fails we record the error code for later if
1851 * the app does an fsync. As long as errors persist we force future rpcs to be
1852 * sync so that the app can get a sync error and break the cycle of queueing
1853 * pages for which writeback will fail. */
1854 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1861 ar->ar_force_sync = 1;
1862 ar->ar_min_xid = ptlrpc_sample_next_xid();
1867 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1868 ar->ar_force_sync = 0;
1871 static void osc_oap_to_pending(struct osc_async_page *oap)
1873 struct loi_oap_pages *lop;
1875 if (oap->oap_cmd & OBD_BRW_WRITE)
1876 lop = &oap->oap_loi->loi_write_lop;
1878 lop = &oap->oap_loi->loi_read_lop;
1880 if (oap->oap_async_flags & ASYNC_URGENT)
1881 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1882 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1883 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1886 /* this must be called holding the loi list lock to give coverage to exit_cache,
1887 * async_flag maintenance, and oap_request */
1888 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1889 struct osc_async_page *oap, int sent, int rc)
1894 if (oap->oap_request != NULL) {
1895 xid = ptlrpc_req_xid(oap->oap_request);
1896 ptlrpc_req_finished(oap->oap_request);
1897 oap->oap_request = NULL;
1900 oap->oap_async_flags = 0;
1901 oap->oap_interrupted = 0;
1903 if (oap->oap_cmd & OBD_BRW_WRITE) {
1904 osc_process_ar(&cli->cl_ar, xid, rc);
1905 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1908 if (rc == 0 && oa != NULL) {
1909 if (oa->o_valid & OBD_MD_FLBLOCKS)
1910 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1911 if (oa->o_valid & OBD_MD_FLMTIME)
1912 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1913 if (oa->o_valid & OBD_MD_FLATIME)
1914 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1915 if (oa->o_valid & OBD_MD_FLCTIME)
1916 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1920 osc_exit_cache(cli, oap, sent);
1921 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1922 oap->oap_oig = NULL;
1927 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1928 oap->oap_cmd, oa, rc);
1930 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1931 * I/O on the page could start, but OSC calls it under lock
1932 * and thus we can add oap back to pending safely */
1934 /* upper layer wants to leave the page on pending queue */
1935 osc_oap_to_pending(oap);
1937 osc_exit_cache(cli, oap, sent);
1941 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1943 struct osc_async_page *oap, *tmp;
1944 struct osc_brw_async_args *aa = data;
1945 struct client_obd *cli;
1948 rc = osc_brw_fini_request(req, rc);
1949 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1950 if (osc_recoverable_error(rc)) {
1951 rc = osc_brw_redo_request(req, aa);
1958 client_obd_list_lock(&cli->cl_loi_list_lock);
1960 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1961 * is called so we know whether to go to sync BRWs or wait for more
1962 * RPCs to complete */
1963 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1964 cli->cl_w_in_flight--;
1966 cli->cl_r_in_flight--;
1968 /* the caller may re-use the oap after the completion call so
1969 * we need to clean it up a little */
1970 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1971 list_del_init(&oap->oap_rpc_item);
1972 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1975 osc_wake_cache_waiters(cli);
1976 osc_check_rpcs(cli);
1978 client_obd_list_unlock(&cli->cl_loi_list_lock);
1980 OBDO_FREE(aa->aa_oa);
1982 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1986 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1987 struct list_head *rpc_list,
1988 int page_count, int cmd)
1990 struct ptlrpc_request *req;
1991 struct brw_page **pga = NULL;
1992 struct osc_brw_async_args *aa;
1993 struct obdo *oa = NULL;
1994 struct obd_async_page_ops *ops = NULL;
1995 void *caller_data = NULL;
1996 struct obd_capa *ocapa;
1997 struct osc_async_page *oap;
2001 LASSERT(!list_empty(rpc_list));
2003 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2005 RETURN(ERR_PTR(-ENOMEM));
2009 GOTO(out, req = ERR_PTR(-ENOMEM));
2012 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2014 ops = oap->oap_caller_ops;
2015 caller_data = oap->oap_caller_data;
2017 pga[i] = &oap->oap_brw_page;
2018 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2019 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2020 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2024 /* always get the data for the obdo for the rpc */
2025 LASSERT(ops != NULL);
2026 ops->ap_fill_obdo(caller_data, cmd, oa);
2027 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2029 sort_brw_pages(pga, page_count);
2030 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2034 CERROR("prep_req failed: %d\n", rc);
2035 GOTO(out, req = ERR_PTR(rc));
2038 /* Need to update the timestamps after the request is built in case
2039 * we race with setattr (locally or in queue at OST). If OST gets
2040 * later setattr before earlier BRW (as determined by the request xid),
2041 * the OST will not use BRW timestamps. Sadly, there is no obvious
2042 * way to do this in a single call. bug 10150 */
2043 ops->ap_update_obdo(caller_data, cmd, oa,
2044 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2046 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2047 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2048 INIT_LIST_HEAD(&aa->aa_oaps);
2049 list_splice(rpc_list, &aa->aa_oaps);
2050 INIT_LIST_HEAD(rpc_list);
2057 OBD_FREE(pga, sizeof(*pga) * page_count);
2062 /* the loi lock is held across this function but it's allowed to release
2063 * and reacquire it during its work */
2064 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2065 int cmd, struct loi_oap_pages *lop)
2067 struct ptlrpc_request *req;
2068 obd_count page_count = 0;
2069 struct osc_async_page *oap = NULL, *tmp;
2070 struct osc_brw_async_args *aa;
2071 struct obd_async_page_ops *ops;
2072 CFS_LIST_HEAD(rpc_list);
2073 unsigned int ending_offset;
2074 unsigned starting_offset = 0;
2077 /* first we find the pages we're allowed to work with */
2078 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2080 ops = oap->oap_caller_ops;
2082 LASSERT(oap->oap_magic == OAP_MAGIC);
2084 /* in llite being 'ready' equates to the page being locked
2085 * until completion unlocks it. commit_write submits a page
2086 * as not ready because its unlock will happen unconditionally
2087 * as the call returns. if we race with commit_write giving
2088 * us that page we dont' want to create a hole in the page
2089 * stream, so we stop and leave the rpc to be fired by
2090 * another dirtier or kupdated interval (the not ready page
2091 * will still be on the dirty list). we could call in
2092 * at the end of ll_file_write to process the queue again. */
2093 if (!(oap->oap_async_flags & ASYNC_READY)) {
2094 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2096 CDEBUG(D_INODE, "oap %p page %p returned %d "
2097 "instead of ready\n", oap,
2101 /* llite is telling us that the page is still
2102 * in commit_write and that we should try
2103 * and put it in an rpc again later. we
2104 * break out of the loop so we don't create
2105 * a hole in the sequence of pages in the rpc
2110 /* the io isn't needed.. tell the checks
2111 * below to complete the rpc with EINTR */
2112 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2113 oap->oap_count = -EINTR;
2116 oap->oap_async_flags |= ASYNC_READY;
2119 LASSERTF(0, "oap %p page %p returned %d "
2120 "from make_ready\n", oap,
2128 * Page submitted for IO has to be locked. Either by
2129 * ->ap_make_ready() or by higher layers.
2131 * XXX nikita: this assertion should be adjusted when lustre
2132 * starts using PG_writeback for pages being written out.
2134 #if defined(__KERNEL__) && defined(__LINUX__)
2135 LASSERT(PageLocked(oap->oap_page));
2137 /* If there is a gap at the start of this page, it can't merge
2138 * with any previous page, so we'll hand the network a
2139 * "fragmented" page array that it can't transfer in 1 RDMA */
2140 if (page_count != 0 && oap->oap_page_off != 0)
2143 /* take the page out of our book-keeping */
2144 list_del_init(&oap->oap_pending_item);
2145 lop_update_pending(cli, lop, cmd, -1);
2146 list_del_init(&oap->oap_urgent_item);
2148 if (page_count == 0)
2149 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2150 (PTLRPC_MAX_BRW_SIZE - 1);
2152 /* ask the caller for the size of the io as the rpc leaves. */
2153 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2155 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2156 if (oap->oap_count <= 0) {
2157 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2159 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2163 /* now put the page back in our accounting */
2164 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2165 if (++page_count >= cli->cl_max_pages_per_rpc)
2168 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2169 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2170 * have the same alignment as the initial writes that allocated
2171 * extents on the server. */
2172 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2173 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2174 if (ending_offset == 0)
2177 /* If there is a gap at the end of this page, it can't merge
2178 * with any subsequent pages, so we'll hand the network a
2179 * "fragmented" page array that it can't transfer in 1 RDMA */
2180 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2184 osc_wake_cache_waiters(cli);
2186 if (page_count == 0)
2189 loi_list_maint(cli, loi);
2191 client_obd_list_unlock(&cli->cl_loi_list_lock);
2193 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2195 /* this should happen rarely and is pretty bad, it makes the
2196 * pending list not follow the dirty order */
2197 client_obd_list_lock(&cli->cl_loi_list_lock);
2198 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2199 list_del_init(&oap->oap_rpc_item);
2201 /* queued sync pages can be torn down while the pages
2202 * were between the pending list and the rpc */
2203 if (oap->oap_interrupted) {
2204 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2205 osc_ap_completion(cli, NULL, oap, 0,
2209 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2211 loi_list_maint(cli, loi);
2212 RETURN(PTR_ERR(req));
2215 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2217 if (cmd == OBD_BRW_READ) {
2218 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2219 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2220 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2221 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2222 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2224 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2225 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2226 cli->cl_w_in_flight);
2227 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2228 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2229 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2232 client_obd_list_lock(&cli->cl_loi_list_lock);
2234 if (cmd == OBD_BRW_READ)
2235 cli->cl_r_in_flight++;
2237 cli->cl_w_in_flight++;
2239 /* queued sync pages can be torn down while the pages
2240 * were between the pending list and the rpc */
2242 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2243 /* only one oap gets a request reference */
2246 if (oap->oap_interrupted && !req->rq_intr) {
2247 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2249 ptlrpc_mark_interrupted(req);
2253 tmp->oap_request = ptlrpc_request_addref(req);
2255 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2256 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2258 req->rq_interpret_reply = brw_interpret_oap;
2259 ptlrpcd_add_req(req);
2263 #define LOI_DEBUG(LOI, STR, args...) \
2264 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2265 !list_empty(&(LOI)->loi_cli_item), \
2266 (LOI)->loi_write_lop.lop_num_pending, \
2267 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2268 (LOI)->loi_read_lop.lop_num_pending, \
2269 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2272 /* This is called by osc_check_rpcs() to find which objects have pages that
2273 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2274 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2277 /* first return all objects which we already know to have
2278 * pages ready to be stuffed into rpcs */
2279 if (!list_empty(&cli->cl_loi_ready_list))
2280 RETURN(list_entry(cli->cl_loi_ready_list.next,
2281 struct lov_oinfo, loi_cli_item));
2283 /* then if we have cache waiters, return all objects with queued
2284 * writes. This is especially important when many small files
2285 * have filled up the cache and not been fired into rpcs because
2286 * they don't pass the nr_pending/object threshhold */
2287 if (!list_empty(&cli->cl_cache_waiters) &&
2288 !list_empty(&cli->cl_loi_write_list))
2289 RETURN(list_entry(cli->cl_loi_write_list.next,
2290 struct lov_oinfo, loi_write_item));
2292 /* then return all queued objects when we have an invalid import
2293 * so that they get flushed */
2294 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2295 if (!list_empty(&cli->cl_loi_write_list))
2296 RETURN(list_entry(cli->cl_loi_write_list.next,
2297 struct lov_oinfo, loi_write_item));
2298 if (!list_empty(&cli->cl_loi_read_list))
2299 RETURN(list_entry(cli->cl_loi_read_list.next,
2300 struct lov_oinfo, loi_read_item));
2305 /* called with the loi list lock held */
2306 static void osc_check_rpcs(struct client_obd *cli)
2308 struct lov_oinfo *loi;
2309 int rc = 0, race_counter = 0;
2312 while ((loi = osc_next_loi(cli)) != NULL) {
2313 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2315 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2318 /* attempt some read/write balancing by alternating between
2319 * reads and writes in an object. The makes_rpc checks here
2320 * would be redundant if we were getting read/write work items
2321 * instead of objects. we don't want send_oap_rpc to drain a
2322 * partial read pending queue when we're given this object to
2323 * do io on writes while there are cache waiters */
2324 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2325 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2326 &loi->loi_write_lop);
2334 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2335 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2336 &loi->loi_read_lop);
2345 /* attempt some inter-object balancing by issueing rpcs
2346 * for each object in turn */
2347 if (!list_empty(&loi->loi_cli_item))
2348 list_del_init(&loi->loi_cli_item);
2349 if (!list_empty(&loi->loi_write_item))
2350 list_del_init(&loi->loi_write_item);
2351 if (!list_empty(&loi->loi_read_item))
2352 list_del_init(&loi->loi_read_item);
2354 loi_list_maint(cli, loi);
2356 /* send_oap_rpc fails with 0 when make_ready tells it to
2357 * back off. llite's make_ready does this when it tries
2358 * to lock a page queued for write that is already locked.
2359 * we want to try sending rpcs from many objects, but we
2360 * don't want to spin failing with 0. */
2361 if (race_counter == 10)
2367 /* we're trying to queue a page in the osc so we're subject to the
2368 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2369 * If the osc's queued pages are already at that limit, then we want to sleep
2370 * until there is space in the osc's queue for us. We also may be waiting for
2371 * write credits from the OST if there are RPCs in flight that may return some
2372 * before we fall back to sync writes.
2374 * We need this know our allocation was granted in the presence of signals */
2375 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2379 client_obd_list_lock(&cli->cl_loi_list_lock);
2380 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2381 client_obd_list_unlock(&cli->cl_loi_list_lock);
2385 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2386 * grant or cache space. */
2387 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2388 struct osc_async_page *oap)
2390 struct osc_cache_waiter ocw;
2391 struct l_wait_info lwi = { 0 };
2395 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2396 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2397 cli->cl_dirty_max, obd_max_dirty_pages,
2398 cli->cl_lost_grant, cli->cl_avail_grant);
2400 /* force the caller to try sync io. this can jump the list
2401 * of queued writes and create a discontiguous rpc stream */
2402 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2403 loi->loi_ar.ar_force_sync)
2406 /* Hopefully normal case - cache space and write credits available */
2407 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2408 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2409 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2410 /* account for ourselves */
2411 osc_consume_write_grant(cli, &oap->oap_brw_page);
2415 /* Make sure that there are write rpcs in flight to wait for. This
2416 * is a little silly as this object may not have any pending but
2417 * other objects sure might. */
2418 if (cli->cl_w_in_flight) {
2419 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2420 cfs_waitq_init(&ocw.ocw_waitq);
2424 loi_list_maint(cli, loi);
2425 osc_check_rpcs(cli);
2426 client_obd_list_unlock(&cli->cl_loi_list_lock);
2428 CDEBUG(D_CACHE, "sleeping for cache space\n");
2429 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2431 client_obd_list_lock(&cli->cl_loi_list_lock);
2432 if (!list_empty(&ocw.ocw_entry)) {
2433 list_del(&ocw.ocw_entry);
2442 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2443 struct lov_oinfo *loi, cfs_page_t *page,
2444 obd_off offset, struct obd_async_page_ops *ops,
2445 void *data, void **res)
2447 struct osc_async_page *oap;
2451 return size_round(sizeof(*oap));
2454 oap->oap_magic = OAP_MAGIC;
2455 oap->oap_cli = &exp->exp_obd->u.cli;
2458 oap->oap_caller_ops = ops;
2459 oap->oap_caller_data = data;
2461 oap->oap_page = page;
2462 oap->oap_obj_off = offset;
2464 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2465 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2466 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2468 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2470 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2474 struct osc_async_page *oap_from_cookie(void *cookie)
2476 struct osc_async_page *oap = cookie;
2477 if (oap->oap_magic != OAP_MAGIC)
2478 return ERR_PTR(-EINVAL);
2482 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2483 struct lov_oinfo *loi, void *cookie,
2484 int cmd, obd_off off, int count,
2485 obd_flag brw_flags, enum async_flags async_flags)
2487 struct client_obd *cli = &exp->exp_obd->u.cli;
2488 struct osc_async_page *oap;
2492 oap = oap_from_cookie(cookie);
2494 RETURN(PTR_ERR(oap));
2496 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2499 if (!list_empty(&oap->oap_pending_item) ||
2500 !list_empty(&oap->oap_urgent_item) ||
2501 !list_empty(&oap->oap_rpc_item))
2504 /* check if the file's owner/group is over quota */
2505 #ifdef HAVE_QUOTA_SUPPORT
2506 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2507 struct obd_async_page_ops *ops;
2514 ops = oap->oap_caller_ops;
2515 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2516 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2527 loi = lsm->lsm_oinfo[0];
2529 client_obd_list_lock(&cli->cl_loi_list_lock);
2532 oap->oap_page_off = off;
2533 oap->oap_count = count;
2534 oap->oap_brw_flags = brw_flags;
2535 oap->oap_async_flags = async_flags;
2537 if (cmd & OBD_BRW_WRITE) {
2538 rc = osc_enter_cache(cli, loi, oap);
2540 client_obd_list_unlock(&cli->cl_loi_list_lock);
2545 osc_oap_to_pending(oap);
2546 loi_list_maint(cli, loi);
2548 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2551 osc_check_rpcs(cli);
2552 client_obd_list_unlock(&cli->cl_loi_list_lock);
2557 /* aka (~was & now & flag), but this is more clear :) */
2558 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2560 static int osc_set_async_flags(struct obd_export *exp,
2561 struct lov_stripe_md *lsm,
2562 struct lov_oinfo *loi, void *cookie,
2563 obd_flag async_flags)
2565 struct client_obd *cli = &exp->exp_obd->u.cli;
2566 struct loi_oap_pages *lop;
2567 struct osc_async_page *oap;
2571 oap = oap_from_cookie(cookie);
2573 RETURN(PTR_ERR(oap));
2576 * bug 7311: OST-side locking is only supported for liblustre for now
2577 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2578 * implementation has to handle case where OST-locked page was picked
2579 * up by, e.g., ->writepage().
2581 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2582 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2585 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2589 loi = lsm->lsm_oinfo[0];
2591 if (oap->oap_cmd & OBD_BRW_WRITE) {
2592 lop = &loi->loi_write_lop;
2594 lop = &loi->loi_read_lop;
2597 client_obd_list_lock(&cli->cl_loi_list_lock);
2599 if (list_empty(&oap->oap_pending_item))
2600 GOTO(out, rc = -EINVAL);
2602 if ((oap->oap_async_flags & async_flags) == async_flags)
2605 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2606 oap->oap_async_flags |= ASYNC_READY;
2608 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2609 if (list_empty(&oap->oap_rpc_item)) {
2610 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2611 loi_list_maint(cli, loi);
2615 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2616 oap->oap_async_flags);
2618 osc_check_rpcs(cli);
2619 client_obd_list_unlock(&cli->cl_loi_list_lock);
2623 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2624 struct lov_oinfo *loi,
2625 struct obd_io_group *oig, void *cookie,
2626 int cmd, obd_off off, int count,
2628 obd_flag async_flags)
2630 struct client_obd *cli = &exp->exp_obd->u.cli;
2631 struct osc_async_page *oap;
2632 struct loi_oap_pages *lop;
2636 oap = oap_from_cookie(cookie);
2638 RETURN(PTR_ERR(oap));
2640 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2643 if (!list_empty(&oap->oap_pending_item) ||
2644 !list_empty(&oap->oap_urgent_item) ||
2645 !list_empty(&oap->oap_rpc_item))
2649 loi = lsm->lsm_oinfo[0];
2651 client_obd_list_lock(&cli->cl_loi_list_lock);
2654 oap->oap_page_off = off;
2655 oap->oap_count = count;
2656 oap->oap_brw_flags = brw_flags;
2657 oap->oap_async_flags = async_flags;
2659 if (cmd & OBD_BRW_WRITE)
2660 lop = &loi->loi_write_lop;
2662 lop = &loi->loi_read_lop;
2664 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2665 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2667 rc = oig_add_one(oig, &oap->oap_occ);
2670 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2671 oap, oap->oap_page, rc);
2673 client_obd_list_unlock(&cli->cl_loi_list_lock);
2678 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2679 struct loi_oap_pages *lop, int cmd)
2681 struct list_head *pos, *tmp;
2682 struct osc_async_page *oap;
2684 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2685 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2686 list_del(&oap->oap_pending_item);
2687 osc_oap_to_pending(oap);
2689 loi_list_maint(cli, loi);
2692 static int osc_trigger_group_io(struct obd_export *exp,
2693 struct lov_stripe_md *lsm,
2694 struct lov_oinfo *loi,
2695 struct obd_io_group *oig)
2697 struct client_obd *cli = &exp->exp_obd->u.cli;
2701 loi = lsm->lsm_oinfo[0];
2703 client_obd_list_lock(&cli->cl_loi_list_lock);
2705 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2706 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2708 osc_check_rpcs(cli);
2709 client_obd_list_unlock(&cli->cl_loi_list_lock);
2714 static int osc_teardown_async_page(struct obd_export *exp,
2715 struct lov_stripe_md *lsm,
2716 struct lov_oinfo *loi, void *cookie)
2718 struct client_obd *cli = &exp->exp_obd->u.cli;
2719 struct loi_oap_pages *lop;
2720 struct osc_async_page *oap;
2724 oap = oap_from_cookie(cookie);
2726 RETURN(PTR_ERR(oap));
2729 loi = lsm->lsm_oinfo[0];
2731 if (oap->oap_cmd & OBD_BRW_WRITE) {
2732 lop = &loi->loi_write_lop;
2734 lop = &loi->loi_read_lop;
2737 client_obd_list_lock(&cli->cl_loi_list_lock);
2739 if (!list_empty(&oap->oap_rpc_item))
2740 GOTO(out, rc = -EBUSY);
2742 osc_exit_cache(cli, oap, 0);
2743 osc_wake_cache_waiters(cli);
2745 if (!list_empty(&oap->oap_urgent_item)) {
2746 list_del_init(&oap->oap_urgent_item);
2747 oap->oap_async_flags &= ~ASYNC_URGENT;
2749 if (!list_empty(&oap->oap_pending_item)) {
2750 list_del_init(&oap->oap_pending_item);
2751 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2753 loi_list_maint(cli, loi);
2755 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2757 client_obd_list_unlock(&cli->cl_loi_list_lock);
2761 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2764 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2767 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2770 lock_res_and_lock(lock);
2773 /* Liang XXX: Darwin and Winnt checking should be added */
2774 if (lock->l_ast_data && lock->l_ast_data != data) {
2775 struct inode *new_inode = data;
2776 struct inode *old_inode = lock->l_ast_data;
2777 if (!(old_inode->i_state & I_FREEING))
2778 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2779 LASSERTF(old_inode->i_state & I_FREEING,
2780 "Found existing inode %p/%lu/%u state %lu in lock: "
2781 "setting data to %p/%lu/%u\n", old_inode,
2782 old_inode->i_ino, old_inode->i_generation,
2784 new_inode, new_inode->i_ino, new_inode->i_generation);
2788 lock->l_ast_data = data;
2789 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2790 unlock_res_and_lock(lock);
2791 LDLM_LOCK_PUT(lock);
2794 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2795 ldlm_iterator_t replace, void *data)
2797 struct ldlm_res_id res_id = { .name = {0} };
2798 struct obd_device *obd = class_exp2obd(exp);
2800 res_id.name[0] = lsm->lsm_object_id;
2801 res_id.name[2] = lsm->lsm_object_gr;
2803 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2807 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2813 /* The request was created before ldlm_cli_enqueue call. */
2814 if (rc == ELDLM_LOCK_ABORTED) {
2815 struct ldlm_reply *rep;
2817 /* swabbed by ldlm_cli_enqueue() */
2818 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2819 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2821 LASSERT(rep != NULL);
2822 if (rep->lock_policy_res1)
2823 rc = rep->lock_policy_res1;
2827 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2828 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2829 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2830 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2831 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2834 /* Call the update callback. */
2835 rc = oinfo->oi_cb_up(oinfo, rc);
2839 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2840 struct osc_enqueue_args *aa, int rc)
2842 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2843 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2844 struct ldlm_lock *lock;
2846 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2848 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2850 /* Complete obtaining the lock procedure. */
2851 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2853 &aa->oa_oi->oi_flags,
2854 &lsm->lsm_oinfo[0]->loi_lvb,
2855 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2856 lustre_swab_ost_lvb,
2857 aa->oa_oi->oi_lockh, rc);
2859 /* Complete osc stuff. */
2860 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2862 /* Release the lock for async request. */
2863 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2864 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2866 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2867 aa->oa_oi->oi_lockh, req, aa);
2868 LDLM_LOCK_PUT(lock);
2872 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2873 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2874 * other synchronous requests, however keeping some locks and trying to obtain
2875 * others may take a considerable amount of time in a case of ost failure; and
2876 * when other sync requests do not get released lock from a client, the client
2877 * is excluded from the cluster -- such scenarious make the life difficult, so
2878 * release locks just after they are obtained. */
2879 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2880 struct ldlm_enqueue_info *einfo,
2881 struct ptlrpc_request_set *rqset)
2883 struct ldlm_res_id res_id = { .name = {0} };
2884 struct obd_device *obd = exp->exp_obd;
2885 struct ldlm_reply *rep;
2886 struct ptlrpc_request *req = NULL;
2887 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2891 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2892 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2894 /* Filesystem lock extents are extended to page boundaries so that
2895 * dealing with the page cache is a little smoother. */
2896 oinfo->oi_policy.l_extent.start -=
2897 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2898 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2900 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2903 /* Next, search for already existing extent locks that will cover us */
2904 rc = ldlm_lock_match(obd->obd_namespace,
2905 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2906 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2909 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2912 /* I would like to be able to ASSERT here that rss <=
2913 * kms, but I can't, for reasons which are explained in
2917 /* We already have a lock, and it's referenced */
2918 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2920 /* For async requests, decref the lock. */
2922 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2927 /* If we're trying to read, we also search for an existing PW lock. The
2928 * VFS and page cache already protect us locally, so lots of readers/
2929 * writers can share a single PW lock.
2931 * There are problems with conversion deadlocks, so instead of
2932 * converting a read lock to a write lock, we'll just enqueue a new
2935 * At some point we should cancel the read lock instead of making them
2936 * send us a blocking callback, but there are problems with canceling
2937 * locks out from other users right now, too. */
2939 if (einfo->ei_mode == LCK_PR) {
2940 rc = ldlm_lock_match(obd->obd_namespace,
2941 oinfo->oi_flags | LDLM_FL_LVB_READY,
2942 &res_id, einfo->ei_type, &oinfo->oi_policy,
2943 LCK_PW, oinfo->oi_lockh);
2945 /* FIXME: This is not incredibly elegant, but it might
2946 * be more elegant than adding another parameter to
2947 * lock_match. I want a second opinion. */
2948 /* addref the lock only if not async requests. */
2950 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2951 osc_set_data_with_check(oinfo->oi_lockh,
2954 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2955 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2963 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2964 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2965 [DLM_LOCKREQ_OFF + 1] = 0 };
2967 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2971 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2972 size[DLM_REPLY_REC_OFF] =
2973 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2974 ptlrpc_req_set_repsize(req, 3, size);
2977 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2978 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2980 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2981 &oinfo->oi_policy, &oinfo->oi_flags,
2982 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2983 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2984 lustre_swab_ost_lvb, oinfo->oi_lockh,
2988 struct osc_enqueue_args *aa;
2989 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2990 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2995 req->rq_interpret_reply = osc_enqueue_interpret;
2996 ptlrpc_set_add_req(rqset, req);
2997 } else if (intent) {
2998 ptlrpc_req_finished(req);
3003 rc = osc_enqueue_fini(req, oinfo, intent, rc);
3005 ptlrpc_req_finished(req);
3010 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3011 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3012 int *flags, void *data, struct lustre_handle *lockh)
3014 struct ldlm_res_id res_id = { .name = {0} };
3015 struct obd_device *obd = exp->exp_obd;
3017 int lflags = *flags;
3020 res_id.name[0] = lsm->lsm_object_id;
3021 res_id.name[2] = lsm->lsm_object_gr;
3023 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3025 /* Filesystem lock extents are extended to page boundaries so that
3026 * dealing with the page cache is a little smoother */
3027 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3028 policy->l_extent.end |= ~CFS_PAGE_MASK;
3030 /* Next, search for already existing extent locks that will cover us */
3031 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3032 &res_id, type, policy, mode, lockh);
3034 //if (!(*flags & LDLM_FL_TEST_LOCK))
3035 osc_set_data_with_check(lockh, data, lflags);
3038 /* If we're trying to read, we also search for an existing PW lock. The
3039 * VFS and page cache already protect us locally, so lots of readers/
3040 * writers can share a single PW lock. */
3041 if (mode == LCK_PR) {
3042 rc = ldlm_lock_match(obd->obd_namespace,
3043 lflags | LDLM_FL_LVB_READY, &res_id,
3044 type, policy, LCK_PW, lockh);
3045 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
3046 /* FIXME: This is not incredibly elegant, but it might
3047 * be more elegant than adding another parameter to
3048 * lock_match. I want a second opinion. */
3049 osc_set_data_with_check(lockh, data, lflags);
3050 ldlm_lock_addref(lockh, LCK_PR);
3051 ldlm_lock_decref(lockh, LCK_PW);
3057 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3058 __u32 mode, struct lustre_handle *lockh)
3062 if (unlikely(mode == LCK_GROUP))
3063 ldlm_lock_decref_and_cancel(lockh, mode);
3065 ldlm_lock_decref(lockh, mode);
3070 static int osc_cancel_unused(struct obd_export *exp,
3071 struct lov_stripe_md *lsm, int flags,
3074 struct obd_device *obd = class_exp2obd(exp);
3075 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3078 res_id.name[0] = lsm->lsm_object_id;
3079 res_id.name[2] = lsm->lsm_object_gr;
3083 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3086 static int osc_join_lru(struct obd_export *exp,
3087 struct lov_stripe_md *lsm, int join)
3089 struct obd_device *obd = class_exp2obd(exp);
3090 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3093 res_id.name[0] = lsm->lsm_object_id;
3094 res_id.name[2] = lsm->lsm_object_gr;
3098 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3101 static int osc_statfs_interpret(struct ptlrpc_request *req,
3102 struct osc_async_args *aa, int rc)
3104 struct obd_statfs *msfs;
3110 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3111 lustre_swab_obd_statfs);
3113 CERROR("Can't unpack obd_statfs\n");
3114 GOTO(out, rc = -EPROTO);
3117 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3119 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3123 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3124 __u64 max_age, struct ptlrpc_request_set *rqset)
3126 struct ptlrpc_request *req;
3127 struct osc_async_args *aa;
3128 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3131 /* We could possibly pass max_age in the request (as an absolute
3132 * timestamp or a "seconds.usec ago") so the target can avoid doing
3133 * extra calls into the filesystem if that isn't necessary (e.g.
3134 * during mount that would help a bit). Having relative timestamps
3135 * is not so great if request processing is slow, while absolute
3136 * timestamps are not ideal because they need time synchronization. */
3137 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3138 OST_STATFS, 1, NULL, NULL);
3142 ptlrpc_req_set_repsize(req, 2, size);
3143 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3145 req->rq_interpret_reply = osc_statfs_interpret;
3146 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3147 aa = (struct osc_async_args *)&req->rq_async_args;
3150 ptlrpc_set_add_req(rqset, req);
3154 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3157 struct obd_statfs *msfs;
3158 struct ptlrpc_request *req;
3159 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3162 /* We could possibly pass max_age in the request (as an absolute
3163 * timestamp or a "seconds.usec ago") so the target can avoid doing
3164 * extra calls into the filesystem if that isn't necessary (e.g.
3165 * during mount that would help a bit). Having relative timestamps
3166 * is not so great if request processing is slow, while absolute
3167 * timestamps are not ideal because they need time synchronization. */
3168 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3169 OST_STATFS, 1, NULL, NULL);
3173 ptlrpc_req_set_repsize(req, 2, size);
3174 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3176 rc = ptlrpc_queue_wait(req);
3180 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3181 lustre_swab_obd_statfs);
3183 CERROR("Can't unpack obd_statfs\n");
3184 GOTO(out, rc = -EPROTO);
3187 memcpy(osfs, msfs, sizeof(*osfs));
3191 ptlrpc_req_finished(req);
3195 /* Retrieve object striping information.
3197 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3198 * the maximum number of OST indices which will fit in the user buffer.
3199 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3201 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3203 struct lov_user_md lum, *lumk;
3204 int rc = 0, lum_size;
3210 if (copy_from_user(&lum, lump, sizeof(lum)))
3213 if (lum.lmm_magic != LOV_USER_MAGIC)
3216 if (lum.lmm_stripe_count > 0) {
3217 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3218 OBD_ALLOC(lumk, lum_size);
3222 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3223 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3225 lum_size = sizeof(lum);
3229 lumk->lmm_object_id = lsm->lsm_object_id;
3230 lumk->lmm_object_gr = lsm->lsm_object_gr;
3231 lumk->lmm_stripe_count = 1;
3233 if (copy_to_user(lump, lumk, lum_size))
3237 OBD_FREE(lumk, lum_size);
3243 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3244 void *karg, void *uarg)
3246 struct obd_device *obd = exp->exp_obd;
3247 struct obd_ioctl_data *data = karg;
3251 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3254 if (!try_module_get(THIS_MODULE)) {
3255 CERROR("Can't get module. Is it alive?");
3260 case OBD_IOC_LOV_GET_CONFIG: {
3262 struct lov_desc *desc;
3263 struct obd_uuid uuid;
3267 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3268 GOTO(out, err = -EINVAL);
3270 data = (struct obd_ioctl_data *)buf;
3272 if (sizeof(*desc) > data->ioc_inllen1) {
3273 obd_ioctl_freedata(buf, len);
3274 GOTO(out, err = -EINVAL);
3277 if (data->ioc_inllen2 < sizeof(uuid)) {
3278 obd_ioctl_freedata(buf, len);
3279 GOTO(out, err = -EINVAL);
3282 desc = (struct lov_desc *)data->ioc_inlbuf1;
3283 desc->ld_tgt_count = 1;
3284 desc->ld_active_tgt_count = 1;
3285 desc->ld_default_stripe_count = 1;
3286 desc->ld_default_stripe_size = 0;
3287 desc->ld_default_stripe_offset = 0;
3288 desc->ld_pattern = 0;
3289 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3291 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3293 err = copy_to_user((void *)uarg, buf, len);
3296 obd_ioctl_freedata(buf, len);
3299 case LL_IOC_LOV_SETSTRIPE:
3300 err = obd_alloc_memmd(exp, karg);
3304 case LL_IOC_LOV_GETSTRIPE:
3305 err = osc_getstripe(karg, uarg);
3307 case OBD_IOC_CLIENT_RECOVER:
3308 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3313 case IOC_OSC_SET_ACTIVE:
3314 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3317 case OBD_IOC_POLL_QUOTACHECK:
3318 err = lquota_poll_check(quota_interface, exp,
3319 (struct if_quotacheck *)karg);
3322 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3323 cmd, cfs_curproc_comm());
3324 GOTO(out, err = -ENOTTY);
3327 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3330 module_put(THIS_MODULE);
3335 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3336 void *key, __u32 *vallen, void *val)
3339 if (!vallen || !val)
3342 if (keylen > strlen("lock_to_stripe") &&
3343 strcmp(key, "lock_to_stripe") == 0) {
3344 __u32 *stripe = val;
3345 *vallen = sizeof(*stripe);
3348 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3349 struct ptlrpc_request *req;
3351 char *bufs[2] = { NULL, key };
3352 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3354 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3355 OST_GET_INFO, 2, size, bufs);
3359 size[REPLY_REC_OFF] = *vallen;
3360 ptlrpc_req_set_repsize(req, 2, size);
3361 rc = ptlrpc_queue_wait(req);
3365 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3366 lustre_swab_ost_last_id);
3367 if (reply == NULL) {
3368 CERROR("Can't unpack OST last ID\n");
3369 GOTO(out, rc = -EPROTO);
3371 *((obd_id *)val) = *reply;
3373 ptlrpc_req_finished(req);
3379 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3382 struct llog_ctxt *ctxt;
3383 struct obd_import *imp = req->rq_import;
3389 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3392 rc = llog_initiator_connect(ctxt);
3394 CERROR("cannot establish connection for "
3395 "ctxt %p: %d\n", ctxt, rc);
3398 spin_lock(&imp->imp_lock);
3399 imp->imp_server_timeout = 1;
3400 imp->imp_pingable = 1;
3401 spin_unlock(&imp->imp_lock);
3402 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3407 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3408 void *key, obd_count vallen, void *val,
3409 struct ptlrpc_request_set *set)
3411 struct ptlrpc_request *req;
3412 struct obd_device *obd = exp->exp_obd;
3413 struct obd_import *imp = class_exp2cliimp(exp);
3414 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3415 char *bufs[3] = { NULL, key, val };
3418 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3420 if (KEY_IS(KEY_NEXT_ID)) {
3421 if (vallen != sizeof(obd_id))
3423 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3424 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3425 exp->exp_obd->obd_name,
3426 obd->u.cli.cl_oscc.oscc_next_id);
3431 if (KEY_IS("unlinked")) {
3432 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3433 spin_lock(&oscc->oscc_lock);
3434 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3435 spin_unlock(&oscc->oscc_lock);
3439 if (KEY_IS(KEY_INIT_RECOV)) {
3440 if (vallen != sizeof(int))
3442 spin_lock(&imp->imp_lock);
3443 imp->imp_initial_recov = *(int *)val;
3444 spin_unlock(&imp->imp_lock);
3445 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3446 exp->exp_obd->obd_name,
3447 imp->imp_initial_recov);
3451 if (KEY_IS("checksum")) {
3452 if (vallen != sizeof(int))
3454 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3458 if (KEY_IS(KEY_FLUSH_CTX)) {
3459 sptlrpc_import_flush_my_ctx(imp);
3466 /* We pass all other commands directly to OST. Since nobody calls osc
3467 methods directly and everybody is supposed to go through LOV, we
3468 assume lov checked invalid values for us.
3469 The only recognised values so far are evict_by_nid and mds_conn.
3470 Even if something bad goes through, we'd get a -EINVAL from OST
3473 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3478 if (KEY_IS("mds_conn")) {
3479 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3481 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3482 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3483 LASSERT(oscc->oscc_oa.o_gr > 0);
3484 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3487 ptlrpc_req_set_repsize(req, 1, NULL);
3488 ptlrpc_set_add_req(set, req);
3489 ptlrpc_check_set(set);
3495 static struct llog_operations osc_size_repl_logops = {
3496 lop_cancel: llog_obd_repl_cancel
3499 static struct llog_operations osc_mds_ost_orig_logops;
3500 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3501 struct obd_device *tgt, int count,
3502 struct llog_catid *catid, struct obd_uuid *uuid)
3507 spin_lock(&obd->obd_dev_lock);
3508 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3509 osc_mds_ost_orig_logops = llog_lvfs_ops;
3510 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3511 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3512 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3513 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3515 spin_unlock(&obd->obd_dev_lock);
3517 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3518 &catid->lci_logid, &osc_mds_ost_orig_logops);
3520 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3524 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3525 &osc_size_repl_logops);
3527 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3530 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3531 obd->obd_name, tgt->obd_name, count, catid, rc);
3532 CERROR("logid "LPX64":0x%x\n",
3533 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3538 static int osc_llog_finish(struct obd_device *obd, int count)
3540 struct llog_ctxt *ctxt;
3541 int rc = 0, rc2 = 0;
3544 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3546 rc = llog_cleanup(ctxt);
3548 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3550 rc2 = llog_cleanup(ctxt);
3557 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3558 struct obd_uuid *cluuid,
3559 struct obd_connect_data *data)
3561 struct client_obd *cli = &obd->u.cli;
3563 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3566 client_obd_list_lock(&cli->cl_loi_list_lock);
3567 data->ocd_grant = cli->cl_avail_grant ?:
3568 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3569 lost_grant = cli->cl_lost_grant;
3570 cli->cl_lost_grant = 0;
3571 client_obd_list_unlock(&cli->cl_loi_list_lock);
3573 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3574 "cl_lost_grant: %ld\n", data->ocd_grant,
3575 cli->cl_avail_grant, lost_grant);
3576 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3577 " ocd_grant: %d\n", data->ocd_connect_flags,
3578 data->ocd_version, data->ocd_grant);
3584 static int osc_disconnect(struct obd_export *exp)
3586 struct obd_device *obd = class_exp2obd(exp);
3587 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3590 if (obd->u.cli.cl_conn_count == 1)
3591 /* flush any remaining cancel messages out to the target */
3592 llog_sync(ctxt, exp);
3594 rc = client_disconnect_export(exp);
3598 static int osc_import_event(struct obd_device *obd,
3599 struct obd_import *imp,
3600 enum obd_import_event event)
3602 struct client_obd *cli;
3606 LASSERT(imp->imp_obd == obd);
3609 case IMP_EVENT_DISCON: {
3610 /* Only do this on the MDS OSC's */
3611 if (imp->imp_server_timeout) {
3612 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3614 spin_lock(&oscc->oscc_lock);
3615 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3616 spin_unlock(&oscc->oscc_lock);
3619 client_obd_list_lock(&cli->cl_loi_list_lock);
3620 cli->cl_avail_grant = 0;
3621 cli->cl_lost_grant = 0;
3622 client_obd_list_unlock(&cli->cl_loi_list_lock);
3625 case IMP_EVENT_INACTIVE: {
3626 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3629 case IMP_EVENT_INVALIDATE: {
3630 struct ldlm_namespace *ns = obd->obd_namespace;
3634 client_obd_list_lock(&cli->cl_loi_list_lock);
3635 /* all pages go to failing rpcs due to the invalid import */
3636 osc_check_rpcs(cli);
3637 client_obd_list_unlock(&cli->cl_loi_list_lock);
3639 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3643 case IMP_EVENT_ACTIVE: {
3644 /* Only do this on the MDS OSC's */
3645 if (imp->imp_server_timeout) {
3646 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3648 spin_lock(&oscc->oscc_lock);
3649 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3650 spin_unlock(&oscc->oscc_lock);
3652 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3655 case IMP_EVENT_OCD: {
3656 struct obd_connect_data *ocd = &imp->imp_connect_data;
3658 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3659 osc_init_grant(&obd->u.cli, ocd);
3662 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3663 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3665 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3669 CERROR("Unknown import event %d\n", event);
3675 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3681 rc = ptlrpcd_addref();
3685 rc = client_obd_setup(obd, lcfg);
3689 struct lprocfs_static_vars lvars;
3690 struct client_obd *cli = &obd->u.cli;
3692 lprocfs_init_vars(osc, &lvars);
3693 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3694 lproc_osc_attach_seqstat(obd);
3695 ptlrpc_lprocfs_register_obd(obd);
3699 /* We need to allocate a few requests more, because
3700 brw_interpret_oap tries to create new requests before freeing
3701 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3702 reserved, but I afraid that might be too much wasted RAM
3703 in fact, so 2 is just my guess and still should work. */
3704 cli->cl_import->imp_rq_pool =
3705 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3707 ptlrpc_add_rqs_to_pool);
3713 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3719 case OBD_CLEANUP_EARLY: {
3720 struct obd_import *imp;
3721 imp = obd->u.cli.cl_import;
3722 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3723 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3724 ptlrpc_deactivate_import(imp);
3725 spin_lock(&imp->imp_lock);
3726 imp->imp_pingable = 0;
3727 spin_unlock(&imp->imp_lock);
3730 case OBD_CLEANUP_EXPORTS: {
3731 /* If we set up but never connected, the
3732 client import will not have been cleaned. */
3733 if (obd->u.cli.cl_import) {
3734 struct obd_import *imp;
3735 imp = obd->u.cli.cl_import;
3736 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3738 ptlrpc_invalidate_import(imp);
3739 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3740 class_destroy_import(imp);
3741 obd->u.cli.cl_import = NULL;
3745 case OBD_CLEANUP_SELF_EXP:
3746 rc = obd_llog_finish(obd, 0);
3748 CERROR("failed to cleanup llogging subsystems\n");
3750 case OBD_CLEANUP_OBD:
3756 int osc_cleanup(struct obd_device *obd)
3758 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3762 ptlrpc_lprocfs_unregister_obd(obd);
3763 lprocfs_obd_cleanup(obd);
3765 spin_lock(&oscc->oscc_lock);
3766 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3767 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3768 spin_unlock(&oscc->oscc_lock);
3770 /* free memory of osc quota cache */
3771 lquota_cleanup(quota_interface, obd);
3773 rc = client_obd_cleanup(obd);
3779 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3781 struct lustre_cfg *lcfg = buf;
3782 struct lprocfs_static_vars lvars;
3785 lprocfs_init_vars(osc, &lvars);
3787 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3791 struct obd_ops osc_obd_ops = {
3792 .o_owner = THIS_MODULE,
3793 .o_setup = osc_setup,
3794 .o_precleanup = osc_precleanup,
3795 .o_cleanup = osc_cleanup,
3796 .o_add_conn = client_import_add_conn,
3797 .o_del_conn = client_import_del_conn,
3798 .o_connect = client_connect_import,
3799 .o_reconnect = osc_reconnect,
3800 .o_disconnect = osc_disconnect,
3801 .o_statfs = osc_statfs,
3802 .o_statfs_async = osc_statfs_async,
3803 .o_packmd = osc_packmd,
3804 .o_unpackmd = osc_unpackmd,
3805 .o_precreate = osc_precreate,
3806 .o_create = osc_create,
3807 .o_destroy = osc_destroy,
3808 .o_getattr = osc_getattr,
3809 .o_getattr_async = osc_getattr_async,
3810 .o_setattr = osc_setattr,
3811 .o_setattr_async = osc_setattr_async,
3813 .o_brw_async = osc_brw_async,
3814 .o_prep_async_page = osc_prep_async_page,
3815 .o_queue_async_io = osc_queue_async_io,
3816 .o_set_async_flags = osc_set_async_flags,
3817 .o_queue_group_io = osc_queue_group_io,
3818 .o_trigger_group_io = osc_trigger_group_io,
3819 .o_teardown_async_page = osc_teardown_async_page,
3820 .o_punch = osc_punch,
3822 .o_enqueue = osc_enqueue,
3823 .o_match = osc_match,
3824 .o_change_cbdata = osc_change_cbdata,
3825 .o_cancel = osc_cancel,
3826 .o_cancel_unused = osc_cancel_unused,
3827 .o_join_lru = osc_join_lru,
3828 .o_iocontrol = osc_iocontrol,
3829 .o_get_info = osc_get_info,
3830 .o_set_info_async = osc_set_info_async,
3831 .o_import_event = osc_import_event,
3832 .o_llog_init = osc_llog_init,
3833 .o_llog_finish = osc_llog_finish,
3834 .o_process_config = osc_process_config,
3836 int __init osc_init(void)
3838 struct lprocfs_static_vars lvars;
3842 lprocfs_init_vars(osc, &lvars);
3844 request_module("lquota");
3845 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3846 lquota_init(quota_interface);
3847 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3849 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3850 LUSTRE_OSC_NAME, NULL);
3852 if (quota_interface)
3853 PORTAL_SYMBOL_PUT(osc_quota_interface);
3861 static void /*__exit*/ osc_exit(void)
3863 lquota_exit(quota_interface);
3864 if (quota_interface)
3865 PORTAL_SYMBOL_PUT(osc_quota_interface);
3867 class_unregister_type(LUSTRE_OSC_NAME);
3870 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3871 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3872 MODULE_LICENSE("GPL");
3874 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);