1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * You may have signed or agreed to another license before downloading
11 * this software. If so, you are bound by the terms and conditions
12 * of that agreement, and the following does not apply to you. See the
13 * LICENSE file included with this distribution for more information.
15 * If you did not agree to a different license, then this copy of Lustre
16 * is open source software; you can redistribute it and/or modify it
17 * under the terms of version 2 of the GNU General Public License as
18 * published by the Free Software Foundation.
20 * In either case, Lustre is distributed in the hope that it will be
21 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * license text for more details.
25 * For testing and management it is treated as an obd_device,
26 * although * it does not export a full OBD method table (the
27 * requests are coming * in over the wire, so object target modules
28 * do not have a full * method table.)
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_OSC
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 atomic_t osc_resend_time;
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 LASSERT(lsm->lsm_object_gr);
95 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
110 if (lmm_bytes < sizeof (*lmm)) {
111 CERROR("lov_mds_md too small: %d, need %d\n",
112 lmm_bytes, (int)sizeof(*lmm));
115 /* XXX LOV_MAGIC etc check? */
117 if (lmm->lmm_object_id == 0) {
118 CERROR("lov_mds_md: zero lmm_object_id\n");
123 lsm_size = lov_stripe_md_size(1);
127 if (*lsmp != NULL && lmm == NULL) {
128 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129 OBD_FREE(*lsmp, lsm_size);
135 OBD_ALLOC(*lsmp, lsm_size);
138 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140 OBD_FREE(*lsmp, lsm_size);
143 loi_init((*lsmp)->lsm_oinfo[0]);
147 /* XXX zero *lsmp? */
148 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150 LASSERT((*lsmp)->lsm_object_id);
151 LASSERT((*lsmp)->lsm_object_gr);
154 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
160 struct ost_body *body, void *capa)
162 struct obd_capa *oc = (struct obd_capa *)capa;
163 struct lustre_capa *c;
168 c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
171 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172 DEBUG_CAPA(D_SEC, c, "pack");
175 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
176 struct obd_info *oinfo)
178 struct ost_body *body;
180 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
181 body->oa = *oinfo->oi_oa;
182 osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
185 static int osc_getattr_interpret(struct ptlrpc_request *req,
186 struct osc_async_args *aa, int rc)
188 struct ost_body *body;
194 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
195 lustre_swab_ost_body);
197 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
200 /* This should really be sent by the OST */
201 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
202 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204 CERROR("can't unpack ost_body\n");
206 aa->aa_oi->oi_oa->o_valid = 0;
209 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
213 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
214 struct ptlrpc_request_set *set)
216 struct ptlrpc_request *req;
217 struct ost_body *body;
218 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219 struct osc_async_args *aa;
222 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
223 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224 OST_GETATTR, 3, size,NULL);
228 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
230 ptlrpc_req_set_repsize(req, 2, size);
231 req->rq_interpret_reply = osc_getattr_interpret;
233 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
234 aa = (struct osc_async_args *)&req->rq_async_args;
237 ptlrpc_set_add_req(set, req);
241 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
243 struct ptlrpc_request *req;
244 struct ost_body *body;
245 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
248 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
249 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
250 OST_GETATTR, 3, size, NULL);
254 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
256 ptlrpc_req_set_repsize(req, 2, size);
258 rc = ptlrpc_queue_wait(req);
260 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
264 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
265 lustre_swab_ost_body);
267 CERROR ("can't unpack ost_body\n");
268 GOTO (out, rc = -EPROTO);
271 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
272 *oinfo->oi_oa = body->oa;
274 /* This should really be sent by the OST */
275 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
276 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
280 ptlrpc_req_finished(req);
284 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
285 struct obd_trans_info *oti)
287 struct ptlrpc_request *req;
288 struct ost_body *body;
289 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
292 LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
293 oinfo->oi_oa->o_gr > 0);
294 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
295 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
296 OST_SETATTR, 3, size, NULL);
300 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
302 ptlrpc_req_set_repsize(req, 2, size);
304 rc = ptlrpc_queue_wait(req);
308 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
309 lustre_swab_ost_body);
311 GOTO(out, rc = -EPROTO);
313 *oinfo->oi_oa = body->oa;
317 ptlrpc_req_finished(req);
321 static int osc_setattr_interpret(struct ptlrpc_request *req,
322 struct osc_async_args *aa, int rc)
324 struct ost_body *body;
330 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
331 lustre_swab_ost_body);
333 CERROR("can't unpack ost_body\n");
334 GOTO(out, rc = -EPROTO);
337 *aa->aa_oi->oi_oa = body->oa;
339 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
343 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
344 struct obd_trans_info *oti,
345 struct ptlrpc_request_set *rqset)
347 struct ptlrpc_request *req;
348 int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
349 struct osc_async_args *aa;
352 size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
353 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
354 OST_SETATTR, 3, size, NULL);
358 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
359 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
361 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
364 ptlrpc_req_set_repsize(req, 2, size);
365 /* do mds to ost setattr asynchronouly */
367 /* Do not wait for response. */
368 ptlrpcd_add_req(req);
370 req->rq_interpret_reply = osc_setattr_interpret;
372 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
373 aa = (struct osc_async_args *)&req->rq_async_args;
376 ptlrpc_set_add_req(rqset, req);
382 int osc_real_create(struct obd_export *exp, struct obdo *oa,
383 struct lov_stripe_md **ea, struct obd_trans_info *oti)
385 struct ptlrpc_request *req;
386 struct ost_body *body;
387 struct lov_stripe_md *lsm;
388 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
396 rc = obd_alloc_memmd(exp, &lsm);
401 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
402 OST_CREATE, 2, size, NULL);
404 GOTO(out, rc = -ENOMEM);
406 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
409 ptlrpc_req_set_repsize(req, 2, size);
410 if (oa->o_valid & OBD_MD_FLINLINE) {
411 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
412 oa->o_flags == OBD_FL_DELORPHAN);
414 "delorphan from OST integration");
415 /* Don't resend the delorphan req */
416 req->rq_no_resend = req->rq_no_delay = 1;
419 rc = ptlrpc_queue_wait(req);
423 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
424 lustre_swab_ost_body);
426 CERROR ("can't unpack ost_body\n");
427 GOTO (out_req, rc = -EPROTO);
432 /* This should really be sent by the OST */
433 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
434 oa->o_valid |= OBD_MD_FLBLKSZ;
436 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
437 * have valid lsm_oinfo data structs, so don't go touching that.
438 * This needs to be fixed in a big way.
440 lsm->lsm_object_id = oa->o_id;
441 lsm->lsm_object_gr = oa->o_gr;
445 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
447 if (oa->o_valid & OBD_MD_FLCOOKIE) {
448 if (!oti->oti_logcookies)
449 oti_alloc_cookies(oti, 1);
450 *oti->oti_logcookies = *obdo_logcookie(oa);
454 CDEBUG(D_HA, "transno: "LPD64"\n",
455 lustre_msg_get_transno(req->rq_repmsg));
458 ptlrpc_req_finished(req);
461 obd_free_memmd(exp, &lsm);
465 static int osc_punch_interpret(struct ptlrpc_request *req,
466 struct osc_async_args *aa, int rc)
468 struct ost_body *body;
474 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
475 lustre_swab_ost_body);
477 CERROR ("can't unpack ost_body\n");
478 GOTO(out, rc = -EPROTO);
481 *aa->aa_oi->oi_oa = body->oa;
483 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
487 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
488 struct obd_trans_info *oti,
489 struct ptlrpc_request_set *rqset)
491 struct ptlrpc_request *req;
492 struct osc_async_args *aa;
493 struct ost_body *body;
494 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
502 size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
503 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
504 OST_PUNCH, 3, size, NULL);
508 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
510 osc_pack_req_body(req, REQ_REC_OFF, oinfo);
511 /* overload the size and blocks fields in the oa with start/end */
512 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
513 body->oa.o_size = oinfo->oi_policy.l_extent.start;
514 body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
515 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
517 ptlrpc_req_set_repsize(req, 2, size);
519 req->rq_interpret_reply = osc_punch_interpret;
520 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
521 aa = (struct osc_async_args *)&req->rq_async_args;
523 ptlrpc_set_add_req(rqset, req);
528 static int osc_sync(struct obd_export *exp, struct obdo *oa,
529 struct lov_stripe_md *md, obd_size start, obd_size end,
532 struct ptlrpc_request *req;
533 struct ost_body *body;
534 int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
542 size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
544 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
545 OST_SYNC, 3, size, NULL);
549 /* overload the size and blocks fields in the oa with start/end */
550 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
552 body->oa.o_size = start;
553 body->oa.o_blocks = end;
554 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
556 osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
558 ptlrpc_req_set_repsize(req, 2, size);
560 rc = ptlrpc_queue_wait(req);
564 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
565 lustre_swab_ost_body);
567 CERROR ("can't unpack ost_body\n");
568 GOTO (out, rc = -EPROTO);
575 ptlrpc_req_finished(req);
579 /* Find and cancel locally locks matched by @mode in the resource found by
580 * @objid. Found locks are added into @cancel list. Returns the amount of
581 * locks added to @cancels list. */
582 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
583 struct list_head *cancels, ldlm_mode_t mode,
586 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
587 struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
588 struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
595 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
596 lock_flags, 0, NULL);
597 ldlm_resource_putref(res);
601 /* Destroy requests can be async always on the client, and we don't even really
602 * care about the return code since the client cannot do anything at all about
604 * When the MDS is unlinking a filename, it saves the file objects into a
605 * recovery llog, and these object records are cancelled when the OST reports
606 * they were destroyed and sync'd to disk (i.e. transaction committed).
607 * If the client dies, or the OST is down when the object should be destroyed,
608 * the records are not cancelled, and when the OST reconnects to the MDS next,
609 * it will retrieve the llog unlink logs and then sends the log cancellation
610 * cookies to the MDS after committing destroy transactions. */
611 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
612 struct lov_stripe_md *ea, struct obd_trans_info *oti,
613 struct obd_export *md_export)
615 CFS_LIST_HEAD(cancels);
616 struct ptlrpc_request *req;
617 struct ost_body *body;
618 int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
619 int count, bufcount = 2;
627 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
628 LDLM_FL_DISCARD_DATA);
629 if (exp_connect_cancelset(exp) && count) {
631 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
633 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
634 OST_DESTROY, bufcount, size, NULL);
635 if (exp_connect_cancelset(exp) && req)
636 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
638 ldlm_lock_list_put(&cancels, l_bl_ast, count);
643 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
645 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
646 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
647 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
648 sizeof(*oti->oti_logcookies));
651 ptlrpc_req_set_repsize(req, 2, size);
653 ptlrpcd_add_req(req);
657 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
660 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
662 LASSERT(!(oa->o_valid & bits));
665 client_obd_list_lock(&cli->cl_loi_list_lock);
666 oa->o_dirty = cli->cl_dirty;
667 if (cli->cl_dirty > cli->cl_dirty_max) {
668 CERROR("dirty %lu > dirty_max %lu\n",
669 cli->cl_dirty, cli->cl_dirty_max);
671 } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
672 CERROR("dirty %d > system dirty_max %d\n",
673 atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
675 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
676 CERROR("dirty %lu - dirty_max %lu too big???\n",
677 cli->cl_dirty, cli->cl_dirty_max);
680 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
681 (cli->cl_max_rpcs_in_flight + 1);
682 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
684 oa->o_grant = cli->cl_avail_grant;
685 oa->o_dropped = cli->cl_lost_grant;
686 cli->cl_lost_grant = 0;
687 client_obd_list_unlock(&cli->cl_loi_list_lock);
688 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
689 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
692 /* caller must hold loi_list_lock */
693 static void osc_consume_write_grant(struct client_obd *cli,
694 struct brw_page *pga)
696 atomic_inc(&obd_dirty_pages);
697 cli->cl_dirty += CFS_PAGE_SIZE;
698 cli->cl_avail_grant -= CFS_PAGE_SIZE;
699 pga->flag |= OBD_BRW_FROM_GRANT;
700 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
701 CFS_PAGE_SIZE, pga, pga->pg);
702 LASSERT(cli->cl_avail_grant >= 0);
705 /* the companion to osc_consume_write_grant, called when a brw has completed.
706 * must be called with the loi lock held. */
707 static void osc_release_write_grant(struct client_obd *cli,
708 struct brw_page *pga, int sent)
710 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
713 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
718 pga->flag &= ~OBD_BRW_FROM_GRANT;
719 atomic_dec(&obd_dirty_pages);
720 cli->cl_dirty -= CFS_PAGE_SIZE;
722 cli->cl_lost_grant += CFS_PAGE_SIZE;
723 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
724 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
725 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
726 /* For short writes we shouldn't count parts of pages that
727 * span a whole block on the OST side, or our accounting goes
728 * wrong. Should match the code in filter_grant_check. */
729 int offset = pga->off & ~CFS_PAGE_MASK;
730 int count = pga->count + (offset & (blocksize - 1));
731 int end = (offset + pga->count) & (blocksize - 1);
733 count += blocksize - end;
735 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
736 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
737 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
738 cli->cl_avail_grant, cli->cl_dirty);
744 static unsigned long rpcs_in_flight(struct client_obd *cli)
746 return cli->cl_r_in_flight + cli->cl_w_in_flight;
749 /* caller must hold loi_list_lock */
750 void osc_wake_cache_waiters(struct client_obd *cli)
752 struct list_head *l, *tmp;
753 struct osc_cache_waiter *ocw;
756 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
757 /* if we can't dirty more, we must wait until some is written */
758 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
759 (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
760 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
761 "osc max %ld, sys max %d\n", cli->cl_dirty,
762 cli->cl_dirty_max, obd_max_dirty_pages);
766 /* if still dirty cache but no grant wait for pending RPCs that
767 * may yet return us some grant before doing sync writes */
768 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
769 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
770 cli->cl_w_in_flight);
774 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
775 list_del_init(&ocw->ocw_entry);
776 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
777 /* no more RPCs in flight to return grant, do sync IO */
778 ocw->ocw_rc = -EDQUOT;
779 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
781 osc_consume_write_grant(cli,
782 &ocw->ocw_oap->oap_brw_page);
785 cfs_waitq_signal(&ocw->ocw_waitq);
791 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
793 client_obd_list_lock(&cli->cl_loi_list_lock);
794 cli->cl_avail_grant = ocd->ocd_grant;
795 client_obd_list_unlock(&cli->cl_loi_list_lock);
797 CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
798 cli->cl_avail_grant, cli->cl_lost_grant);
799 LASSERT(cli->cl_avail_grant >= 0);
802 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
804 client_obd_list_lock(&cli->cl_loi_list_lock);
805 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
806 cli->cl_avail_grant += body->oa.o_grant;
807 /* waiters are woken in brw_interpret_oap */
808 client_obd_list_unlock(&cli->cl_loi_list_lock);
811 /* We assume that the reason this OSC got a short read is because it read
812 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
813 * via the LOV, and it _knows_ it's reading inside the file, it's just that
814 * this stripe never got written at or beyond this stripe offset yet. */
815 static void handle_short_read(int nob_read, obd_count page_count,
816 struct brw_page **pga)
821 /* skip bytes read OK */
822 while (nob_read > 0) {
823 LASSERT (page_count > 0);
825 if (pga[i]->count > nob_read) {
826 /* EOF inside this page */
827 ptr = cfs_kmap(pga[i]->pg) +
828 (pga[i]->off & ~CFS_PAGE_MASK);
829 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
830 cfs_kunmap(pga[i]->pg);
836 nob_read -= pga[i]->count;
841 /* zero remaining pages */
842 while (page_count-- > 0) {
843 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
844 memset(ptr, 0, pga[i]->count);
845 cfs_kunmap(pga[i]->pg);
850 static int check_write_rcs(struct ptlrpc_request *req,
851 int requested_nob, int niocount,
852 obd_count page_count, struct brw_page **pga)
856 /* return error if any niobuf was in error */
857 remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
858 sizeof(*remote_rcs) * niocount, NULL);
859 if (remote_rcs == NULL) {
860 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
863 if (lustre_msg_swabbed(req->rq_repmsg))
864 for (i = 0; i < niocount; i++)
865 __swab32s(&remote_rcs[i]);
867 for (i = 0; i < niocount; i++) {
868 if (remote_rcs[i] < 0)
869 return(remote_rcs[i]);
871 if (remote_rcs[i] != 0) {
872 CERROR("rc[%d] invalid (%d) req %p\n",
873 i, remote_rcs[i], req);
878 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
879 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
880 requested_nob, req->rq_bulk->bd_nob_transferred);
887 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
889 if (p1->flag != p2->flag) {
890 unsigned mask = ~OBD_BRW_FROM_GRANT;
892 /* warn if we try to combine flags that we don't know to be
894 if ((p1->flag & mask) != (p2->flag & mask))
895 CERROR("is it ok to have flags 0x%x and 0x%x in the "
896 "same brw?\n", p1->flag, p2->flag);
900 return (p1->off + p1->count == p2->off);
903 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
904 struct brw_page **pga)
909 LASSERT (pg_count > 0);
910 while (nob > 0 && pg_count > 0) {
911 char *ptr = cfs_kmap(pga[i]->pg);
912 int off = pga[i]->off & ~CFS_PAGE_MASK;
913 int count = pga[i]->count > nob ? nob : pga[i]->count;
915 /* corrupt the data before we compute the checksum, to
916 * simulate an OST->client data error */
918 OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
919 memcpy(ptr + off, "bad1", min(4, nob));
920 cksum = crc32_le(cksum, ptr + off, count);
921 cfs_kunmap(pga[i]->pg);
922 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
925 nob -= pga[i]->count;
929 /* For sending we only compute the wrong checksum instead
930 * of corrupting the data so it is still correct on a redo */
931 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
937 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
938 struct lov_stripe_md *lsm, obd_count page_count,
939 struct brw_page **pga,
940 struct ptlrpc_request **reqp,
941 struct obd_capa *ocapa)
943 struct ptlrpc_request *req;
944 struct ptlrpc_bulk_desc *desc;
945 struct ost_body *body;
946 struct obd_ioobj *ioobj;
947 struct niobuf_remote *niobuf;
948 int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
949 int niocount, i, requested_nob, opc, rc;
950 struct ptlrpc_request_pool *pool;
951 struct lustre_capa *capa;
952 struct osc_brw_async_args *aa;
955 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
956 OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
958 if ((cmd & OBD_BRW_WRITE) != 0) {
960 pool = cli->cl_import->imp_rq_pool;
966 for (niocount = i = 1; i < page_count; i++) {
967 if (!can_merge_pages(pga[i - 1], pga[i]))
971 size[REQ_REC_OFF + 1] = sizeof(*ioobj);
972 size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
974 size[REQ_REC_OFF + 3] = sizeof(*capa);
976 req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
977 size, NULL, pool, NULL);
981 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
983 if (opc == OST_WRITE)
984 desc = ptlrpc_prep_bulk_imp (req, page_count,
985 BULK_GET_SOURCE, OST_BULK_PORTAL);
987 desc = ptlrpc_prep_bulk_imp (req, page_count,
988 BULK_PUT_SINK, OST_BULK_PORTAL);
990 GOTO(out, rc = -ENOMEM);
991 /* NB request now owns desc and will free it when it gets freed */
993 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
994 ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
995 niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
996 niocount * sizeof(*niobuf));
1000 obdo_to_ioobj(oa, ioobj);
1001 ioobj->ioo_bufcnt = niocount;
1003 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1005 capa_cpy(capa, ocapa);
1006 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1009 LASSERT (page_count > 0);
1010 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1011 struct brw_page *pg = pga[i];
1012 struct brw_page *pg_prev = pga[i - 1];
1014 LASSERT(pg->count > 0);
1015 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1016 "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1017 pg->off, pg->count);
1019 LASSERTF(i == 0 || pg->off > pg_prev->off,
1020 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1021 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1023 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1024 pg_prev->pg, page_private(pg_prev->pg),
1025 pg_prev->pg->index, pg_prev->off);
1027 LASSERTF(i == 0 || pg->off > pg_prev->off,
1028 "i %d p_c %u\n", i, page_count);
1030 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1031 (pg->flag & OBD_BRW_SRVLOCK));
1033 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1035 requested_nob += pg->count;
1037 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1039 niobuf->len += pg->count;
1041 niobuf->offset = pg->off;
1042 niobuf->len = pg->count;
1043 niobuf->flags = pg->flag;
1047 LASSERT((void *)(niobuf - niocount) ==
1048 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1049 niocount * sizeof(*niobuf)));
1050 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1052 /* size[REQ_REC_OFF] still sizeof (*body) */
1053 if (opc == OST_WRITE) {
1054 if (unlikely(cli->cl_checksum)) {
1055 body->oa.o_valid |= OBD_MD_FLCKSUM;
1056 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1058 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1060 /* save this in 'oa', too, for later checking */
1061 oa->o_valid |= OBD_MD_FLCKSUM;
1063 /* clear out the checksum flag, in case this is a
1064 * resend but cl_checksum is no longer set. b=11238 */
1065 oa->o_valid &= ~OBD_MD_FLCKSUM;
1067 oa->o_cksum = body->oa.o_cksum;
1068 /* 1 RC per niobuf */
1069 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1070 ptlrpc_req_set_repsize(req, 3, size);
1072 if (unlikely(cli->cl_checksum))
1073 body->oa.o_valid |= OBD_MD_FLCKSUM;
1074 /* 1 RC for the whole I/O */
1075 ptlrpc_req_set_repsize(req, 2, size);
1078 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1079 aa = (struct osc_brw_async_args *)&req->rq_async_args;
1081 aa->aa_requested_nob = requested_nob;
1082 aa->aa_nio_count = niocount;
1083 aa->aa_page_count = page_count;
1087 INIT_LIST_HEAD(&aa->aa_oaps);
1093 ptlrpc_req_finished (req);
1097 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1098 __u32 client_cksum, __u32 server_cksum,
1099 int nob, obd_count page_count,
1100 struct brw_page **pga)
1105 if (server_cksum == client_cksum) {
1106 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1110 new_cksum = osc_checksum_bulk(nob, page_count, pga);
1112 if (new_cksum == server_cksum)
1113 msg = "changed on the client after we checksummed it - "
1114 "likely false positive due to mmap IO (bug 11742)";
1115 else if (new_cksum == client_cksum)
1116 msg = "changed in transit before arrival at OST";
1118 msg = "changed in transit AND doesn't match the original - "
1119 "likely false positive due to mmap IO (bug 11742)";
1121 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1122 LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1123 "["LPU64"-"LPU64"]\n",
1124 msg, libcfs_nid2str(peer->nid),
1125 oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1126 oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1129 oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1131 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1132 CERROR("original client csum %x, server csum %x, client csum now %x\n",
1133 client_cksum, server_cksum, new_cksum);
1137 /* Note rc enters this function as number of bytes transferred */
1138 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1140 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1141 const lnet_process_id_t *peer =
1142 &req->rq_import->imp_connection->c_peer;
1143 struct client_obd *cli = aa->aa_cli;
1144 struct ost_body *body;
1145 __u32 client_cksum = 0;
1148 if (rc < 0 && rc != -EDQUOT)
1151 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1152 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1153 lustre_swab_ost_body);
1155 CERROR ("Can't unpack body\n");
1159 /* set/clear over quota flag for a uid/gid */
1160 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1161 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1162 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1163 body->oa.o_gid, body->oa.o_valid,
1169 if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1170 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1172 osc_update_grant(cli, body);
1174 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1176 CERROR ("Unexpected +ve rc %d\n", rc);
1179 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1181 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1183 check_write_checksum(&body->oa, peer, client_cksum,
1185 aa->aa_requested_nob,
1190 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1193 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1194 aa->aa_page_count, aa->aa_ppga);
1198 /* The rest of this function executes only for OST_READs */
1199 if (rc > aa->aa_requested_nob) {
1200 CERROR("Unexpected rc %d (%d requested)\n", rc,
1201 aa->aa_requested_nob);
1205 if (rc != req->rq_bulk->bd_nob_transferred) {
1206 CERROR ("Unexpected rc %d (%d transferred)\n",
1207 rc, req->rq_bulk->bd_nob_transferred);
1211 if (rc < aa->aa_requested_nob)
1212 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1214 if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1216 GOTO(out, rc = -EAGAIN);
1218 if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1219 static int cksum_counter;
1220 __u32 server_cksum = body->oa.o_cksum;
1224 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1227 if (peer->nid == req->rq_bulk->bd_sender) {
1231 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1234 if (server_cksum == ~0 && rc > 0) {
1235 CERROR("Protocol error: server %s set the 'checksum' "
1236 "bit, but didn't send a checksum. Not fatal, "
1237 "but please tell CFS.\n",
1238 libcfs_nid2str(peer->nid));
1239 } else if (server_cksum != client_cksum) {
1240 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1241 "%s%s%s inum "LPU64"/"LPU64" object "
1242 LPU64"/"LPU64" extent "
1243 "["LPU64"-"LPU64"]\n",
1244 req->rq_import->imp_obd->obd_name,
1245 libcfs_nid2str(peer->nid),
1247 body->oa.o_valid & OBD_MD_FLFID ?
1248 body->oa.o_fid : (__u64)0,
1249 body->oa.o_valid & OBD_MD_FLFID ?
1250 body->oa.o_generation :(__u64)0,
1252 body->oa.o_valid & OBD_MD_FLGROUP ?
1253 body->oa.o_gr : (__u64)0,
1254 aa->aa_ppga[0]->off,
1255 aa->aa_ppga[aa->aa_page_count-1]->off +
1256 aa->aa_ppga[aa->aa_page_count-1]->count -
1258 CERROR("client %x, server %x\n",
1259 client_cksum, server_cksum);
1261 aa->aa_oa->o_cksum = client_cksum;
1265 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1268 } else if (unlikely(client_cksum)) {
1269 static int cksum_missed;
1272 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1273 CERROR("Checksum %u requested from %s but not sent\n",
1274 cksum_missed, libcfs_nid2str(peer->nid));
1280 *aa->aa_oa = body->oa;
1285 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1286 struct lov_stripe_md *lsm,
1287 obd_count page_count, struct brw_page **pga,
1288 struct obd_capa *ocapa)
1290 struct ptlrpc_request *req;
1294 struct l_wait_info lwi;
1298 cfs_waitq_init(&waitq);
1301 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1302 page_count, pga, &req, ocapa);
1306 rc = ptlrpc_queue_wait(req);
1308 if (rc == -ETIMEDOUT && req->rq_resend) {
1309 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1310 ptlrpc_req_finished(req);
1314 rc = osc_brw_fini_request(req, rc);
1316 ptlrpc_req_finished(req);
1317 if (osc_recoverable_error(rc)) {
1319 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1320 CERROR("too many resend retries, returning error\n");
1324 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1325 l_wait_event(waitq, 0, &lwi);
1333 int osc_brw_redo_request(struct ptlrpc_request *request,
1334 struct osc_brw_async_args *aa)
1336 struct ptlrpc_request *new_req;
1337 struct ptlrpc_request_set *set = request->rq_set;
1338 struct osc_brw_async_args *new_aa;
1339 struct osc_async_page *oap;
1343 if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1344 CERROR("too many resend retries, returning error\n");
1348 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1350 body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1351 if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1352 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1355 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1356 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1357 aa->aa_cli, aa->aa_oa,
1358 NULL /* lsm unused by osc currently */,
1359 aa->aa_page_count, aa->aa_ppga,
1360 &new_req, NULL /* ocapa */);
1364 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1366 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1367 if (oap->oap_request != NULL) {
1368 LASSERTF(request == oap->oap_request,
1369 "request %p != oap_request %p\n",
1370 request, oap->oap_request);
1371 if (oap->oap_interrupted) {
1372 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1373 ptlrpc_req_finished(new_req);
1378 /* New request takes over pga and oaps from old request.
1379 * Note that copying a list_head doesn't work, need to move it... */
1381 new_req->rq_interpret_reply = request->rq_interpret_reply;
1382 new_req->rq_async_args = request->rq_async_args;
1383 new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1385 new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1387 INIT_LIST_HEAD(&new_aa->aa_oaps);
1388 list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1389 INIT_LIST_HEAD(&aa->aa_oaps);
1391 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1392 if (oap->oap_request) {
1393 ptlrpc_req_finished(oap->oap_request);
1394 oap->oap_request = ptlrpc_request_addref(new_req);
1397 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1399 DEBUG_REQ(D_INFO, new_req, "new request");
1401 ptlrpc_set_add_req(set, new_req);
1406 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1408 struct osc_brw_async_args *aa = data;
1413 rc = osc_brw_fini_request(req, rc);
1414 if (osc_recoverable_error(rc)) {
1415 rc = osc_brw_redo_request(req, aa);
1419 if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1420 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1422 spin_lock(&aa->aa_cli->cl_loi_list_lock);
1423 for (i = 0; i < aa->aa_page_count; i++)
1424 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1425 spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1427 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1432 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1433 struct lov_stripe_md *lsm, obd_count page_count,
1434 struct brw_page **pga, struct ptlrpc_request_set *set,
1435 struct obd_capa *ocapa)
1437 struct ptlrpc_request *req;
1438 struct client_obd *cli = &exp->exp_obd->u.cli;
1442 /* Consume write credits even if doing a sync write -
1443 * otherwise we may run out of space on OST due to grant. */
1444 if (cmd == OBD_BRW_WRITE) {
1445 spin_lock(&cli->cl_loi_list_lock);
1446 for (i = 0; i < page_count; i++) {
1447 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1448 osc_consume_write_grant(cli, pga[i]);
1450 spin_unlock(&cli->cl_loi_list_lock);
1453 rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1456 req->rq_interpret_reply = brw_interpret;
1457 ptlrpc_set_add_req(set, req);
1458 } else if (cmd == OBD_BRW_WRITE) {
1459 spin_lock(&cli->cl_loi_list_lock);
1460 for (i = 0; i < page_count; i++)
1461 osc_release_write_grant(cli, pga[i], 0);
1462 spin_unlock(&cli->cl_loi_list_lock);
1468 * ugh, we want disk allocation on the target to happen in offset order. we'll
1469 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1470 * fine for our small page arrays and doesn't require allocation. its an
1471 * insertion sort that swaps elements that are strides apart, shrinking the
1472 * stride down until its '1' and the array is sorted.
1474 static void sort_brw_pages(struct brw_page **array, int num)
1477 struct brw_page *tmp;
1481 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1486 for (i = stride ; i < num ; i++) {
1489 while (j >= stride && array[j - stride]->off > tmp->off) {
1490 array[j] = array[j - stride];
1495 } while (stride > 1);
1498 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1504 LASSERT (pages > 0);
1505 offset = pg[i]->off & ~CFS_PAGE_MASK;
1509 if (pages == 0) /* that's all */
1512 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1513 return count; /* doesn't end on page boundary */
1516 offset = pg[i]->off & ~CFS_PAGE_MASK;
1517 if (offset != 0) /* doesn't start on page boundary */
1524 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1526 struct brw_page **ppga;
1529 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1533 for (i = 0; i < count; i++)
1538 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1540 LASSERT(ppga != NULL);
1541 OBD_FREE(ppga, sizeof(*ppga) * count);
1544 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1545 obd_count page_count, struct brw_page *pga,
1546 struct obd_trans_info *oti)
1548 struct obdo *saved_oa = NULL;
1549 struct brw_page **ppga, **orig;
1550 struct obd_import *imp = class_exp2cliimp(exp);
1551 struct client_obd *cli = &imp->imp_obd->u.cli;
1552 int rc, page_count_orig;
1555 if (cmd & OBD_BRW_CHECK) {
1556 /* The caller just wants to know if there's a chance that this
1557 * I/O can succeed */
1559 if (imp == NULL || imp->imp_invalid)
1564 /* test_brw with a failed create can trip this, maybe others. */
1565 LASSERT(cli->cl_max_pages_per_rpc);
1569 orig = ppga = osc_build_ppga(pga, page_count);
1572 page_count_orig = page_count;
1574 sort_brw_pages(ppga, page_count);
1575 while (page_count) {
1576 obd_count pages_per_brw;
1578 if (page_count > cli->cl_max_pages_per_rpc)
1579 pages_per_brw = cli->cl_max_pages_per_rpc;
1581 pages_per_brw = page_count;
1583 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1585 if (saved_oa != NULL) {
1586 /* restore previously saved oa */
1587 *oinfo->oi_oa = *saved_oa;
1588 } else if (page_count > pages_per_brw) {
1589 /* save a copy of oa (brw will clobber it) */
1590 OBDO_ALLOC(saved_oa);
1591 if (saved_oa == NULL)
1592 GOTO(out, rc = -ENOMEM);
1593 *saved_oa = *oinfo->oi_oa;
1596 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1597 pages_per_brw, ppga, oinfo->oi_capa);
1602 page_count -= pages_per_brw;
1603 ppga += pages_per_brw;
1607 osc_release_ppga(orig, page_count_orig);
1609 if (saved_oa != NULL)
1610 OBDO_FREE(saved_oa);
1615 static int osc_brw_async(int cmd, struct obd_export *exp,
1616 struct obd_info *oinfo, obd_count page_count,
1617 struct brw_page *pga, struct obd_trans_info *oti,
1618 struct ptlrpc_request_set *set)
1620 struct brw_page **ppga, **orig;
1621 struct client_obd *cli = &exp->exp_obd->u.cli;
1622 int page_count_orig;
1626 if (cmd & OBD_BRW_CHECK) {
1627 struct obd_import *imp = class_exp2cliimp(exp);
1628 /* The caller just wants to know if there's a chance that this
1629 * I/O can succeed */
1631 if (imp == NULL || imp->imp_invalid)
1636 orig = ppga = osc_build_ppga(pga, page_count);
1639 page_count_orig = page_count;
1641 sort_brw_pages(ppga, page_count);
1642 while (page_count) {
1643 struct brw_page **copy;
1644 obd_count pages_per_brw;
1646 pages_per_brw = min_t(obd_count, page_count,
1647 cli->cl_max_pages_per_rpc);
1649 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1651 /* use ppga only if single RPC is going to fly */
1652 if (pages_per_brw != page_count_orig || ppga != orig) {
1653 OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1655 GOTO(out, rc = -ENOMEM);
1656 memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1660 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1661 pages_per_brw, copy, set, oinfo->oi_capa);
1665 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1669 /* we passed it to async_internal() which is
1670 * now responsible for releasing memory */
1674 page_count -= pages_per_brw;
1675 ppga += pages_per_brw;
1679 osc_release_ppga(orig, page_count_orig);
1683 static void osc_check_rpcs(struct client_obd *cli);
1685 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1686 * the dirty accounting. Writeback completes or truncate happens before
1687 * writing starts. Must be called with the loi lock held. */
1688 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1691 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1695 /* This maintains the lists of pending pages to read/write for a given object
1696 * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1697 * to quickly find objects that are ready to send an RPC. */
1698 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1704 if (lop->lop_num_pending == 0)
1707 /* if we have an invalid import we want to drain the queued pages
1708 * by forcing them through rpcs that immediately fail and complete
1709 * the pages. recovery relies on this to empty the queued pages
1710 * before canceling the locks and evicting down the llite pages */
1711 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1714 /* stream rpcs in queue order as long as as there is an urgent page
1715 * queued. this is our cheap solution for good batching in the case
1716 * where writepage marks some random page in the middle of the file
1717 * as urgent because of, say, memory pressure */
1718 if (!list_empty(&lop->lop_urgent)) {
1719 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1722 /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1723 optimal = cli->cl_max_pages_per_rpc;
1724 if (cmd & OBD_BRW_WRITE) {
1725 /* trigger a write rpc stream as long as there are dirtiers
1726 * waiting for space. as they're waiting, they're not going to
1727 * create more pages to coallesce with what's waiting.. */
1728 if (!list_empty(&cli->cl_cache_waiters)) {
1729 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1732 /* +16 to avoid triggering rpcs that would want to include pages
1733 * that are being queued but which can't be made ready until
1734 * the queuer finishes with the page. this is a wart for
1735 * llite::commit_write() */
1738 if (lop->lop_num_pending >= optimal)
1744 static void on_list(struct list_head *item, struct list_head *list,
1747 if (list_empty(item) && should_be_on)
1748 list_add_tail(item, list);
1749 else if (!list_empty(item) && !should_be_on)
1750 list_del_init(item);
1753 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1754 * can find pages to build into rpcs quickly */
1755 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1757 on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1758 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1759 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1761 on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1762 loi->loi_write_lop.lop_num_pending);
1764 on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1765 loi->loi_read_lop.lop_num_pending);
1768 static void lop_update_pending(struct client_obd *cli,
1769 struct loi_oap_pages *lop, int cmd, int delta)
1771 lop->lop_num_pending += delta;
1772 if (cmd & OBD_BRW_WRITE)
1773 cli->cl_pending_w_pages += delta;
1775 cli->cl_pending_r_pages += delta;
1778 /* this is called when a sync waiter receives an interruption. Its job is to
1779 * get the caller woken as soon as possible. If its page hasn't been put in an
1780 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1781 * desiring interruption which will forcefully complete the rpc once the rpc
1783 static void osc_occ_interrupted(struct oig_callback_context *occ)
1785 struct osc_async_page *oap;
1786 struct loi_oap_pages *lop;
1787 struct lov_oinfo *loi;
1790 /* XXX member_of() */
1791 oap = list_entry(occ, struct osc_async_page, oap_occ);
1793 client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1795 oap->oap_interrupted = 1;
1797 /* ok, it's been put in an rpc. only one oap gets a request reference */
1798 if (oap->oap_request != NULL) {
1799 ptlrpc_mark_interrupted(oap->oap_request);
1800 ptlrpcd_wake(oap->oap_request);
1804 /* we don't get interruption callbacks until osc_trigger_group_io()
1805 * has been called and put the sync oaps in the pending/urgent lists.*/
1806 if (!list_empty(&oap->oap_pending_item)) {
1807 list_del_init(&oap->oap_pending_item);
1808 list_del_init(&oap->oap_urgent_item);
1811 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1812 &loi->loi_write_lop : &loi->loi_read_lop;
1813 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1814 loi_list_maint(oap->oap_cli, oap->oap_loi);
1816 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1817 oap->oap_oig = NULL;
1821 client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1824 /* this is trying to propogate async writeback errors back up to the
1825 * application. As an async write fails we record the error code for later if
1826 * the app does an fsync. As long as errors persist we force future rpcs to be
1827 * sync so that the app can get a sync error and break the cycle of queueing
1828 * pages for which writeback will fail. */
1829 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1836 ar->ar_force_sync = 1;
1837 ar->ar_min_xid = ptlrpc_sample_next_xid();
1842 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1843 ar->ar_force_sync = 0;
1846 static void osc_oap_to_pending(struct osc_async_page *oap)
1848 struct loi_oap_pages *lop;
1850 if (oap->oap_cmd & OBD_BRW_WRITE)
1851 lop = &oap->oap_loi->loi_write_lop;
1853 lop = &oap->oap_loi->loi_read_lop;
1855 if (oap->oap_async_flags & ASYNC_URGENT)
1856 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1857 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1858 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1861 /* this must be called holding the loi list lock to give coverage to exit_cache,
1862 * async_flag maintenance, and oap_request */
1863 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1864 struct osc_async_page *oap, int sent, int rc)
1869 if (oap->oap_request != NULL) {
1870 xid = ptlrpc_req_xid(oap->oap_request);
1871 ptlrpc_req_finished(oap->oap_request);
1872 oap->oap_request = NULL;
1875 oap->oap_async_flags = 0;
1876 oap->oap_interrupted = 0;
1878 if (oap->oap_cmd & OBD_BRW_WRITE) {
1879 osc_process_ar(&cli->cl_ar, xid, rc);
1880 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1883 if (rc == 0 && oa != NULL) {
1884 if (oa->o_valid & OBD_MD_FLBLOCKS)
1885 oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1886 if (oa->o_valid & OBD_MD_FLMTIME)
1887 oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1888 if (oa->o_valid & OBD_MD_FLATIME)
1889 oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1890 if (oa->o_valid & OBD_MD_FLCTIME)
1891 oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1895 osc_exit_cache(cli, oap, sent);
1896 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1897 oap->oap_oig = NULL;
1902 rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1903 oap->oap_cmd, oa, rc);
1905 /* ll_ap_completion (from llite) drops PG_locked. so, a new
1906 * I/O on the page could start, but OSC calls it under lock
1907 * and thus we can add oap back to pending safely */
1909 /* upper layer wants to leave the page on pending queue */
1910 osc_oap_to_pending(oap);
1912 osc_exit_cache(cli, oap, sent);
1916 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1918 struct osc_async_page *oap, *tmp;
1919 struct osc_brw_async_args *aa = data;
1920 struct client_obd *cli;
1923 rc = osc_brw_fini_request(req, rc);
1924 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1925 if (osc_recoverable_error(rc)) {
1926 rc = osc_brw_redo_request(req, aa);
1933 client_obd_list_lock(&cli->cl_loi_list_lock);
1935 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1936 * is called so we know whether to go to sync BRWs or wait for more
1937 * RPCs to complete */
1938 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1939 cli->cl_w_in_flight--;
1941 cli->cl_r_in_flight--;
1943 /* the caller may re-use the oap after the completion call so
1944 * we need to clean it up a little */
1945 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1946 list_del_init(&oap->oap_rpc_item);
1947 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1950 osc_wake_cache_waiters(cli);
1951 osc_check_rpcs(cli);
1953 client_obd_list_unlock(&cli->cl_loi_list_lock);
1955 OBDO_FREE(aa->aa_oa);
1957 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1961 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1962 struct list_head *rpc_list,
1963 int page_count, int cmd)
1965 struct ptlrpc_request *req;
1966 struct brw_page **pga = NULL;
1967 struct osc_brw_async_args *aa;
1968 struct obdo *oa = NULL;
1969 struct obd_async_page_ops *ops = NULL;
1970 void *caller_data = NULL;
1971 struct obd_capa *ocapa;
1972 struct osc_async_page *oap;
1976 LASSERT(!list_empty(rpc_list));
1978 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1980 RETURN(ERR_PTR(-ENOMEM));
1984 GOTO(out, req = ERR_PTR(-ENOMEM));
1987 list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1989 ops = oap->oap_caller_ops;
1990 caller_data = oap->oap_caller_data;
1992 pga[i] = &oap->oap_brw_page;
1993 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1994 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1995 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1999 /* always get the data for the obdo for the rpc */
2000 LASSERT(ops != NULL);
2001 ops->ap_fill_obdo(caller_data, cmd, oa);
2002 ocapa = ops->ap_lookup_capa(caller_data, cmd);
2004 sort_brw_pages(pga, page_count);
2005 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2009 CERROR("prep_req failed: %d\n", rc);
2010 GOTO(out, req = ERR_PTR(rc));
2013 /* Need to update the timestamps after the request is built in case
2014 * we race with setattr (locally or in queue at OST). If OST gets
2015 * later setattr before earlier BRW (as determined by the request xid),
2016 * the OST will not use BRW timestamps. Sadly, there is no obvious
2017 * way to do this in a single call. bug 10150 */
2018 ops->ap_update_obdo(caller_data, cmd, oa,
2019 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2021 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2022 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2023 INIT_LIST_HEAD(&aa->aa_oaps);
2024 list_splice(rpc_list, &aa->aa_oaps);
2025 INIT_LIST_HEAD(rpc_list);
2032 OBD_FREE(pga, sizeof(*pga) * page_count);
2037 /* the loi lock is held across this function but it's allowed to release
2038 * and reacquire it during its work */
2039 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2040 int cmd, struct loi_oap_pages *lop)
2042 struct ptlrpc_request *req;
2043 obd_count page_count = 0;
2044 struct osc_async_page *oap = NULL, *tmp;
2045 struct osc_brw_async_args *aa;
2046 struct obd_async_page_ops *ops;
2047 CFS_LIST_HEAD(rpc_list);
2048 unsigned int ending_offset;
2049 unsigned starting_offset = 0;
2052 /* first we find the pages we're allowed to work with */
2053 list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2055 ops = oap->oap_caller_ops;
2057 LASSERT(oap->oap_magic == OAP_MAGIC);
2059 /* in llite being 'ready' equates to the page being locked
2060 * until completion unlocks it. commit_write submits a page
2061 * as not ready because its unlock will happen unconditionally
2062 * as the call returns. if we race with commit_write giving
2063 * us that page we dont' want to create a hole in the page
2064 * stream, so we stop and leave the rpc to be fired by
2065 * another dirtier or kupdated interval (the not ready page
2066 * will still be on the dirty list). we could call in
2067 * at the end of ll_file_write to process the queue again. */
2068 if (!(oap->oap_async_flags & ASYNC_READY)) {
2069 int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2071 CDEBUG(D_INODE, "oap %p page %p returned %d "
2072 "instead of ready\n", oap,
2076 /* llite is telling us that the page is still
2077 * in commit_write and that we should try
2078 * and put it in an rpc again later. we
2079 * break out of the loop so we don't create
2080 * a hole in the sequence of pages in the rpc
2085 /* the io isn't needed.. tell the checks
2086 * below to complete the rpc with EINTR */
2087 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2088 oap->oap_count = -EINTR;
2091 oap->oap_async_flags |= ASYNC_READY;
2094 LASSERTF(0, "oap %p page %p returned %d "
2095 "from make_ready\n", oap,
2103 * Page submitted for IO has to be locked. Either by
2104 * ->ap_make_ready() or by higher layers.
2106 * XXX nikita: this assertion should be adjusted when lustre
2107 * starts using PG_writeback for pages being written out.
2109 #if defined(__KERNEL__) && defined(__LINUX__)
2110 LASSERT(PageLocked(oap->oap_page));
2112 /* If there is a gap at the start of this page, it can't merge
2113 * with any previous page, so we'll hand the network a
2114 * "fragmented" page array that it can't transfer in 1 RDMA */
2115 if (page_count != 0 && oap->oap_page_off != 0)
2118 /* take the page out of our book-keeping */
2119 list_del_init(&oap->oap_pending_item);
2120 lop_update_pending(cli, lop, cmd, -1);
2121 list_del_init(&oap->oap_urgent_item);
2123 if (page_count == 0)
2124 starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2125 (PTLRPC_MAX_BRW_SIZE - 1);
2127 /* ask the caller for the size of the io as the rpc leaves. */
2128 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2130 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2131 if (oap->oap_count <= 0) {
2132 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2134 osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2138 /* now put the page back in our accounting */
2139 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2140 if (++page_count >= cli->cl_max_pages_per_rpc)
2143 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
2144 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2145 * have the same alignment as the initial writes that allocated
2146 * extents on the server. */
2147 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2148 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2149 if (ending_offset == 0)
2152 /* If there is a gap at the end of this page, it can't merge
2153 * with any subsequent pages, so we'll hand the network a
2154 * "fragmented" page array that it can't transfer in 1 RDMA */
2155 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2159 osc_wake_cache_waiters(cli);
2161 if (page_count == 0)
2164 loi_list_maint(cli, loi);
2166 client_obd_list_unlock(&cli->cl_loi_list_lock);
2168 req = osc_build_req(cli, &rpc_list, page_count, cmd);
2170 /* this should happen rarely and is pretty bad, it makes the
2171 * pending list not follow the dirty order */
2172 client_obd_list_lock(&cli->cl_loi_list_lock);
2173 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2174 list_del_init(&oap->oap_rpc_item);
2176 /* queued sync pages can be torn down while the pages
2177 * were between the pending list and the rpc */
2178 if (oap->oap_interrupted) {
2179 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2180 osc_ap_completion(cli, NULL, oap, 0,
2184 osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2186 loi_list_maint(cli, loi);
2187 RETURN(PTR_ERR(req));
2190 aa = (struct osc_brw_async_args *)&req->rq_async_args;
2192 if (cmd == OBD_BRW_READ) {
2193 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2194 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2195 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2196 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2197 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2199 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2200 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2201 cli->cl_w_in_flight);
2202 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2203 (starting_offset >> CFS_PAGE_SHIFT) + 1);
2204 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2207 client_obd_list_lock(&cli->cl_loi_list_lock);
2209 if (cmd == OBD_BRW_READ)
2210 cli->cl_r_in_flight++;
2212 cli->cl_w_in_flight++;
2214 /* queued sync pages can be torn down while the pages
2215 * were between the pending list and the rpc */
2217 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2218 /* only one oap gets a request reference */
2221 if (oap->oap_interrupted && !req->rq_intr) {
2222 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2224 ptlrpc_mark_interrupted(req);
2228 tmp->oap_request = ptlrpc_request_addref(req);
2230 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2231 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2233 req->rq_interpret_reply = brw_interpret_oap;
2234 ptlrpcd_add_req(req);
2238 #define LOI_DEBUG(LOI, STR, args...) \
2239 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
2240 !list_empty(&(LOI)->loi_cli_item), \
2241 (LOI)->loi_write_lop.lop_num_pending, \
2242 !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
2243 (LOI)->loi_read_lop.lop_num_pending, \
2244 !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
2247 /* This is called by osc_check_rpcs() to find which objects have pages that
2248 * we could be sending. These lists are maintained by lop_makes_rpc(). */
2249 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2252 /* first return all objects which we already know to have
2253 * pages ready to be stuffed into rpcs */
2254 if (!list_empty(&cli->cl_loi_ready_list))
2255 RETURN(list_entry(cli->cl_loi_ready_list.next,
2256 struct lov_oinfo, loi_cli_item));
2258 /* then if we have cache waiters, return all objects with queued
2259 * writes. This is especially important when many small files
2260 * have filled up the cache and not been fired into rpcs because
2261 * they don't pass the nr_pending/object threshhold */
2262 if (!list_empty(&cli->cl_cache_waiters) &&
2263 !list_empty(&cli->cl_loi_write_list))
2264 RETURN(list_entry(cli->cl_loi_write_list.next,
2265 struct lov_oinfo, loi_write_item));
2267 /* then return all queued objects when we have an invalid import
2268 * so that they get flushed */
2269 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2270 if (!list_empty(&cli->cl_loi_write_list))
2271 RETURN(list_entry(cli->cl_loi_write_list.next,
2272 struct lov_oinfo, loi_write_item));
2273 if (!list_empty(&cli->cl_loi_read_list))
2274 RETURN(list_entry(cli->cl_loi_read_list.next,
2275 struct lov_oinfo, loi_read_item));
2280 /* called with the loi list lock held */
2281 static void osc_check_rpcs(struct client_obd *cli)
2283 struct lov_oinfo *loi;
2284 int rc = 0, race_counter = 0;
2287 while ((loi = osc_next_loi(cli)) != NULL) {
2288 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2290 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2293 /* attempt some read/write balancing by alternating between
2294 * reads and writes in an object. The makes_rpc checks here
2295 * would be redundant if we were getting read/write work items
2296 * instead of objects. we don't want send_oap_rpc to drain a
2297 * partial read pending queue when we're given this object to
2298 * do io on writes while there are cache waiters */
2299 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2300 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2301 &loi->loi_write_lop);
2309 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2310 rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2311 &loi->loi_read_lop);
2320 /* attempt some inter-object balancing by issueing rpcs
2321 * for each object in turn */
2322 if (!list_empty(&loi->loi_cli_item))
2323 list_del_init(&loi->loi_cli_item);
2324 if (!list_empty(&loi->loi_write_item))
2325 list_del_init(&loi->loi_write_item);
2326 if (!list_empty(&loi->loi_read_item))
2327 list_del_init(&loi->loi_read_item);
2329 loi_list_maint(cli, loi);
2331 /* send_oap_rpc fails with 0 when make_ready tells it to
2332 * back off. llite's make_ready does this when it tries
2333 * to lock a page queued for write that is already locked.
2334 * we want to try sending rpcs from many objects, but we
2335 * don't want to spin failing with 0. */
2336 if (race_counter == 10)
2342 /* we're trying to queue a page in the osc so we're subject to the
2343 * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2344 * If the osc's queued pages are already at that limit, then we want to sleep
2345 * until there is space in the osc's queue for us. We also may be waiting for
2346 * write credits from the OST if there are RPCs in flight that may return some
2347 * before we fall back to sync writes.
2349 * We need this know our allocation was granted in the presence of signals */
2350 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2354 client_obd_list_lock(&cli->cl_loi_list_lock);
2355 rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2356 client_obd_list_unlock(&cli->cl_loi_list_lock);
2360 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2361 * grant or cache space. */
2362 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2363 struct osc_async_page *oap)
2365 struct osc_cache_waiter ocw;
2366 struct l_wait_info lwi = { 0 };
2370 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2371 "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2372 cli->cl_dirty_max, obd_max_dirty_pages,
2373 cli->cl_lost_grant, cli->cl_avail_grant);
2375 /* force the caller to try sync io. this can jump the list
2376 * of queued writes and create a discontiguous rpc stream */
2377 if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2378 loi->loi_ar.ar_force_sync)
2381 /* Hopefully normal case - cache space and write credits available */
2382 if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2383 (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2384 (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2385 /* account for ourselves */
2386 osc_consume_write_grant(cli, &oap->oap_brw_page);
2390 /* Make sure that there are write rpcs in flight to wait for. This
2391 * is a little silly as this object may not have any pending but
2392 * other objects sure might. */
2393 if (cli->cl_w_in_flight) {
2394 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2395 cfs_waitq_init(&ocw.ocw_waitq);
2399 loi_list_maint(cli, loi);
2400 osc_check_rpcs(cli);
2401 client_obd_list_unlock(&cli->cl_loi_list_lock);
2403 CDEBUG(D_CACHE, "sleeping for cache space\n");
2404 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2406 client_obd_list_lock(&cli->cl_loi_list_lock);
2407 if (!list_empty(&ocw.ocw_entry)) {
2408 list_del(&ocw.ocw_entry);
2417 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2418 struct lov_oinfo *loi, cfs_page_t *page,
2419 obd_off offset, struct obd_async_page_ops *ops,
2420 void *data, void **res)
2422 struct osc_async_page *oap;
2426 return size_round(sizeof(*oap));
2429 oap->oap_magic = OAP_MAGIC;
2430 oap->oap_cli = &exp->exp_obd->u.cli;
2433 oap->oap_caller_ops = ops;
2434 oap->oap_caller_data = data;
2436 oap->oap_page = page;
2437 oap->oap_obj_off = offset;
2439 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2440 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2441 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2443 oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2445 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2449 struct osc_async_page *oap_from_cookie(void *cookie)
2451 struct osc_async_page *oap = cookie;
2452 if (oap->oap_magic != OAP_MAGIC)
2453 return ERR_PTR(-EINVAL);
2457 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2458 struct lov_oinfo *loi, void *cookie,
2459 int cmd, obd_off off, int count,
2460 obd_flag brw_flags, enum async_flags async_flags)
2462 struct client_obd *cli = &exp->exp_obd->u.cli;
2463 struct osc_async_page *oap;
2467 oap = oap_from_cookie(cookie);
2469 RETURN(PTR_ERR(oap));
2471 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2474 if (!list_empty(&oap->oap_pending_item) ||
2475 !list_empty(&oap->oap_urgent_item) ||
2476 !list_empty(&oap->oap_rpc_item))
2479 /* check if the file's owner/group is over quota */
2480 #ifdef HAVE_QUOTA_SUPPORT
2481 if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2482 struct obd_async_page_ops *ops;
2489 ops = oap->oap_caller_ops;
2490 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2491 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2502 loi = lsm->lsm_oinfo[0];
2504 client_obd_list_lock(&cli->cl_loi_list_lock);
2507 oap->oap_page_off = off;
2508 oap->oap_count = count;
2509 oap->oap_brw_flags = brw_flags;
2510 oap->oap_async_flags = async_flags;
2512 if (cmd & OBD_BRW_WRITE) {
2513 rc = osc_enter_cache(cli, loi, oap);
2515 client_obd_list_unlock(&cli->cl_loi_list_lock);
2520 osc_oap_to_pending(oap);
2521 loi_list_maint(cli, loi);
2523 LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2526 osc_check_rpcs(cli);
2527 client_obd_list_unlock(&cli->cl_loi_list_lock);
2532 /* aka (~was & now & flag), but this is more clear :) */
2533 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2535 static int osc_set_async_flags(struct obd_export *exp,
2536 struct lov_stripe_md *lsm,
2537 struct lov_oinfo *loi, void *cookie,
2538 obd_flag async_flags)
2540 struct client_obd *cli = &exp->exp_obd->u.cli;
2541 struct loi_oap_pages *lop;
2542 struct osc_async_page *oap;
2546 oap = oap_from_cookie(cookie);
2548 RETURN(PTR_ERR(oap));
2551 * bug 7311: OST-side locking is only supported for liblustre for now
2552 * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2553 * implementation has to handle case where OST-locked page was picked
2554 * up by, e.g., ->writepage().
2556 LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2557 LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2560 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2564 loi = lsm->lsm_oinfo[0];
2566 if (oap->oap_cmd & OBD_BRW_WRITE) {
2567 lop = &loi->loi_write_lop;
2569 lop = &loi->loi_read_lop;
2572 client_obd_list_lock(&cli->cl_loi_list_lock);
2574 if (list_empty(&oap->oap_pending_item))
2575 GOTO(out, rc = -EINVAL);
2577 if ((oap->oap_async_flags & async_flags) == async_flags)
2580 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2581 oap->oap_async_flags |= ASYNC_READY;
2583 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2584 if (list_empty(&oap->oap_rpc_item)) {
2585 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2586 loi_list_maint(cli, loi);
2590 LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2591 oap->oap_async_flags);
2593 osc_check_rpcs(cli);
2594 client_obd_list_unlock(&cli->cl_loi_list_lock);
2598 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2599 struct lov_oinfo *loi,
2600 struct obd_io_group *oig, void *cookie,
2601 int cmd, obd_off off, int count,
2603 obd_flag async_flags)
2605 struct client_obd *cli = &exp->exp_obd->u.cli;
2606 struct osc_async_page *oap;
2607 struct loi_oap_pages *lop;
2611 oap = oap_from_cookie(cookie);
2613 RETURN(PTR_ERR(oap));
2615 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2618 if (!list_empty(&oap->oap_pending_item) ||
2619 !list_empty(&oap->oap_urgent_item) ||
2620 !list_empty(&oap->oap_rpc_item))
2624 loi = lsm->lsm_oinfo[0];
2626 client_obd_list_lock(&cli->cl_loi_list_lock);
2629 oap->oap_page_off = off;
2630 oap->oap_count = count;
2631 oap->oap_brw_flags = brw_flags;
2632 oap->oap_async_flags = async_flags;
2634 if (cmd & OBD_BRW_WRITE)
2635 lop = &loi->loi_write_lop;
2637 lop = &loi->loi_read_lop;
2639 list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2640 if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2642 rc = oig_add_one(oig, &oap->oap_occ);
2645 LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2646 oap, oap->oap_page, rc);
2648 client_obd_list_unlock(&cli->cl_loi_list_lock);
2653 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2654 struct loi_oap_pages *lop, int cmd)
2656 struct list_head *pos, *tmp;
2657 struct osc_async_page *oap;
2659 list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2660 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2661 list_del(&oap->oap_pending_item);
2662 osc_oap_to_pending(oap);
2664 loi_list_maint(cli, loi);
2667 static int osc_trigger_group_io(struct obd_export *exp,
2668 struct lov_stripe_md *lsm,
2669 struct lov_oinfo *loi,
2670 struct obd_io_group *oig)
2672 struct client_obd *cli = &exp->exp_obd->u.cli;
2676 loi = lsm->lsm_oinfo[0];
2678 client_obd_list_lock(&cli->cl_loi_list_lock);
2680 osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2681 osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2683 osc_check_rpcs(cli);
2684 client_obd_list_unlock(&cli->cl_loi_list_lock);
2689 static int osc_teardown_async_page(struct obd_export *exp,
2690 struct lov_stripe_md *lsm,
2691 struct lov_oinfo *loi, void *cookie)
2693 struct client_obd *cli = &exp->exp_obd->u.cli;
2694 struct loi_oap_pages *lop;
2695 struct osc_async_page *oap;
2699 oap = oap_from_cookie(cookie);
2701 RETURN(PTR_ERR(oap));
2704 loi = lsm->lsm_oinfo[0];
2706 if (oap->oap_cmd & OBD_BRW_WRITE) {
2707 lop = &loi->loi_write_lop;
2709 lop = &loi->loi_read_lop;
2712 client_obd_list_lock(&cli->cl_loi_list_lock);
2714 if (!list_empty(&oap->oap_rpc_item))
2715 GOTO(out, rc = -EBUSY);
2717 osc_exit_cache(cli, oap, 0);
2718 osc_wake_cache_waiters(cli);
2720 if (!list_empty(&oap->oap_urgent_item)) {
2721 list_del_init(&oap->oap_urgent_item);
2722 oap->oap_async_flags &= ~ASYNC_URGENT;
2724 if (!list_empty(&oap->oap_pending_item)) {
2725 list_del_init(&oap->oap_pending_item);
2726 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2728 loi_list_maint(cli, loi);
2730 LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2732 client_obd_list_unlock(&cli->cl_loi_list_lock);
2736 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2739 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2742 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2745 lock_res_and_lock(lock);
2748 /* Liang XXX: Darwin and Winnt checking should be added */
2749 if (lock->l_ast_data && lock->l_ast_data != data) {
2750 struct inode *new_inode = data;
2751 struct inode *old_inode = lock->l_ast_data;
2752 if (!(old_inode->i_state & I_FREEING))
2753 LDLM_ERROR(lock, "inconsistent l_ast_data found");
2754 LASSERTF(old_inode->i_state & I_FREEING,
2755 "Found existing inode %p/%lu/%u state %lu in lock: "
2756 "setting data to %p/%lu/%u\n", old_inode,
2757 old_inode->i_ino, old_inode->i_generation,
2759 new_inode, new_inode->i_ino, new_inode->i_generation);
2763 lock->l_ast_data = data;
2764 lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2765 unlock_res_and_lock(lock);
2766 LDLM_LOCK_PUT(lock);
2769 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2770 ldlm_iterator_t replace, void *data)
2772 struct ldlm_res_id res_id = { .name = {0} };
2773 struct obd_device *obd = class_exp2obd(exp);
2775 res_id.name[0] = lsm->lsm_object_id;
2776 res_id.name[2] = lsm->lsm_object_gr;
2778 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2782 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2788 /* The request was created before ldlm_cli_enqueue call. */
2789 if (rc == ELDLM_LOCK_ABORTED) {
2790 struct ldlm_reply *rep;
2792 /* swabbed by ldlm_cli_enqueue() */
2793 LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2794 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2796 LASSERT(rep != NULL);
2797 if (rep->lock_policy_res1)
2798 rc = rep->lock_policy_res1;
2802 if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2803 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2804 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2805 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2806 oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2809 /* Call the update callback. */
2810 rc = oinfo->oi_cb_up(oinfo, rc);
2814 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2815 struct osc_enqueue_args *aa, int rc)
2817 int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2818 struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2819 struct ldlm_lock *lock;
2821 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2823 lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2825 /* Complete obtaining the lock procedure. */
2826 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2828 &aa->oa_oi->oi_flags,
2829 &lsm->lsm_oinfo[0]->loi_lvb,
2830 sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2831 lustre_swab_ost_lvb,
2832 aa->oa_oi->oi_lockh, rc);
2834 /* Complete osc stuff. */
2835 rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2837 /* Release the lock for async request. */
2838 if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2839 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2841 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2842 aa->oa_oi->oi_lockh, req, aa);
2843 LDLM_LOCK_PUT(lock);
2847 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2848 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2849 * other synchronous requests, however keeping some locks and trying to obtain
2850 * others may take a considerable amount of time in a case of ost failure; and
2851 * when other sync requests do not get released lock from a client, the client
2852 * is excluded from the cluster -- such scenarious make the life difficult, so
2853 * release locks just after they are obtained. */
2854 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2855 struct ldlm_enqueue_info *einfo,
2856 struct ptlrpc_request_set *rqset)
2858 struct ldlm_res_id res_id = { .name = {0} };
2859 struct obd_device *obd = exp->exp_obd;
2860 struct ldlm_reply *rep;
2861 struct ptlrpc_request *req = NULL;
2862 int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2866 res_id.name[0] = oinfo->oi_md->lsm_object_id;
2867 res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2869 /* Filesystem lock extents are extended to page boundaries so that
2870 * dealing with the page cache is a little smoother. */
2871 oinfo->oi_policy.l_extent.start -=
2872 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2873 oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2875 if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2878 /* Next, search for already existing extent locks that will cover us */
2879 rc = ldlm_lock_match(obd->obd_namespace,
2880 oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2881 einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2884 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2887 /* I would like to be able to ASSERT here that rss <=
2888 * kms, but I can't, for reasons which are explained in
2892 /* We already have a lock, and it's referenced */
2893 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2895 /* For async requests, decref the lock. */
2897 ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2902 /* If we're trying to read, we also search for an existing PW lock. The
2903 * VFS and page cache already protect us locally, so lots of readers/
2904 * writers can share a single PW lock.
2906 * There are problems with conversion deadlocks, so instead of
2907 * converting a read lock to a write lock, we'll just enqueue a new
2910 * At some point we should cancel the read lock instead of making them
2911 * send us a blocking callback, but there are problems with canceling
2912 * locks out from other users right now, too. */
2914 if (einfo->ei_mode == LCK_PR) {
2915 rc = ldlm_lock_match(obd->obd_namespace,
2916 oinfo->oi_flags | LDLM_FL_LVB_READY,
2917 &res_id, einfo->ei_type, &oinfo->oi_policy,
2918 LCK_PW, oinfo->oi_lockh);
2920 /* FIXME: This is not incredibly elegant, but it might
2921 * be more elegant than adding another parameter to
2922 * lock_match. I want a second opinion. */
2923 /* addref the lock only if not async requests. */
2925 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2926 osc_set_data_with_check(oinfo->oi_lockh,
2929 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2930 ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2938 [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2939 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
2940 [DLM_LOCKREQ_OFF + 1] = 0 };
2942 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2946 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2947 size[DLM_REPLY_REC_OFF] =
2948 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2949 ptlrpc_req_set_repsize(req, 3, size);
2952 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2953 oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2955 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2956 &oinfo->oi_policy, &oinfo->oi_flags,
2957 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2958 sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2959 lustre_swab_ost_lvb, oinfo->oi_lockh,
2963 struct osc_enqueue_args *aa;
2964 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2965 aa = (struct osc_enqueue_args *)&req->rq_async_args;
2970 req->rq_interpret_reply = osc_enqueue_interpret;
2971 ptlrpc_set_add_req(rqset, req);
2972 } else if (intent) {
2973 ptlrpc_req_finished(req);
2978 rc = osc_enqueue_fini(req, oinfo, intent, rc);
2980 ptlrpc_req_finished(req);
2985 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2986 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2987 int *flags, void *data, struct lustre_handle *lockh)
2989 struct ldlm_res_id res_id = { .name = {0} };
2990 struct obd_device *obd = exp->exp_obd;
2992 int lflags = *flags;
2995 res_id.name[0] = lsm->lsm_object_id;
2996 res_id.name[2] = lsm->lsm_object_gr;
2998 OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3000 /* Filesystem lock extents are extended to page boundaries so that
3001 * dealing with the page cache is a little smoother */
3002 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3003 policy->l_extent.end |= ~CFS_PAGE_MASK;
3005 /* Next, search for already existing extent locks that will cover us */
3006 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3007 &res_id, type, policy, mode, lockh);
3009 //if (!(*flags & LDLM_FL_TEST_LOCK))
3010 osc_set_data_with_check(lockh, data, lflags);
3013 /* If we're trying to read, we also search for an existing PW lock. The
3014 * VFS and page cache already protect us locally, so lots of readers/
3015 * writers can share a single PW lock. */
3016 if (mode == LCK_PR) {
3017 rc = ldlm_lock_match(obd->obd_namespace,
3018 lflags | LDLM_FL_LVB_READY, &res_id,
3019 type, policy, LCK_PW, lockh);
3020 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
3021 /* FIXME: This is not incredibly elegant, but it might
3022 * be more elegant than adding another parameter to
3023 * lock_match. I want a second opinion. */
3024 osc_set_data_with_check(lockh, data, lflags);
3025 ldlm_lock_addref(lockh, LCK_PR);
3026 ldlm_lock_decref(lockh, LCK_PW);
3032 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3033 __u32 mode, struct lustre_handle *lockh)
3037 if (unlikely(mode == LCK_GROUP))
3038 ldlm_lock_decref_and_cancel(lockh, mode);
3040 ldlm_lock_decref(lockh, mode);
3045 static int osc_cancel_unused(struct obd_export *exp,
3046 struct lov_stripe_md *lsm, int flags,
3049 struct obd_device *obd = class_exp2obd(exp);
3050 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3053 res_id.name[0] = lsm->lsm_object_id;
3054 res_id.name[2] = lsm->lsm_object_gr;
3058 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3061 static int osc_join_lru(struct obd_export *exp,
3062 struct lov_stripe_md *lsm, int join)
3064 struct obd_device *obd = class_exp2obd(exp);
3065 struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3068 res_id.name[0] = lsm->lsm_object_id;
3069 res_id.name[2] = lsm->lsm_object_gr;
3073 return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3076 static int osc_statfs_interpret(struct ptlrpc_request *req,
3077 struct osc_async_args *aa, int rc)
3079 struct obd_statfs *msfs;
3085 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3086 lustre_swab_obd_statfs);
3088 CERROR("Can't unpack obd_statfs\n");
3089 GOTO(out, rc = -EPROTO);
3092 memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3094 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3098 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3099 __u64 max_age, struct ptlrpc_request_set *rqset)
3101 struct ptlrpc_request *req;
3102 struct osc_async_args *aa;
3103 int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3106 /* We could possibly pass max_age in the request (as an absolute
3107 * timestamp or a "seconds.usec ago") so the target can avoid doing
3108 * extra calls into the filesystem if that isn't necessary (e.g.
3109 * during mount that would help a bit). Having relative timestamps
3110 * is not so great if request processing is slow, while absolute
3111 * timestamps are not ideal because they need time synchronization. */
3112 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3113 OST_STATFS, 1, NULL, NULL);
3117 ptlrpc_req_set_repsize(req, 2, size);
3118 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3120 req->rq_interpret_reply = osc_statfs_interpret;
3121 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3122 aa = (struct osc_async_args *)&req->rq_async_args;
3125 ptlrpc_set_add_req(rqset, req);
3129 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3132 struct obd_statfs *msfs;
3133 struct ptlrpc_request *req;
3134 int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3137 /* We could possibly pass max_age in the request (as an absolute
3138 * timestamp or a "seconds.usec ago") so the target can avoid doing
3139 * extra calls into the filesystem if that isn't necessary (e.g.
3140 * during mount that would help a bit). Having relative timestamps
3141 * is not so great if request processing is slow, while absolute
3142 * timestamps are not ideal because they need time synchronization. */
3143 req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3144 OST_STATFS, 1, NULL, NULL);
3148 ptlrpc_req_set_repsize(req, 2, size);
3149 req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3151 rc = ptlrpc_queue_wait(req);
3155 msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3156 lustre_swab_obd_statfs);
3158 CERROR("Can't unpack obd_statfs\n");
3159 GOTO(out, rc = -EPROTO);
3162 memcpy(osfs, msfs, sizeof(*osfs));
3166 ptlrpc_req_finished(req);
3170 /* Retrieve object striping information.
3172 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3173 * the maximum number of OST indices which will fit in the user buffer.
3174 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3176 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3178 struct lov_user_md lum, *lumk;
3179 int rc = 0, lum_size;
3185 if (copy_from_user(&lum, lump, sizeof(lum)))
3188 if (lum.lmm_magic != LOV_USER_MAGIC)
3191 if (lum.lmm_stripe_count > 0) {
3192 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3193 OBD_ALLOC(lumk, lum_size);
3197 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3198 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3200 lum_size = sizeof(lum);
3204 lumk->lmm_object_id = lsm->lsm_object_id;
3205 lumk->lmm_object_gr = lsm->lsm_object_gr;
3206 lumk->lmm_stripe_count = 1;
3208 if (copy_to_user(lump, lumk, lum_size))
3212 OBD_FREE(lumk, lum_size);
3218 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3219 void *karg, void *uarg)
3221 struct obd_device *obd = exp->exp_obd;
3222 struct obd_ioctl_data *data = karg;
3226 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3229 if (!try_module_get(THIS_MODULE)) {
3230 CERROR("Can't get module. Is it alive?");
3235 case OBD_IOC_LOV_GET_CONFIG: {
3237 struct lov_desc *desc;
3238 struct obd_uuid uuid;
3242 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3243 GOTO(out, err = -EINVAL);
3245 data = (struct obd_ioctl_data *)buf;
3247 if (sizeof(*desc) > data->ioc_inllen1) {
3248 obd_ioctl_freedata(buf, len);
3249 GOTO(out, err = -EINVAL);
3252 if (data->ioc_inllen2 < sizeof(uuid)) {
3253 obd_ioctl_freedata(buf, len);
3254 GOTO(out, err = -EINVAL);
3257 desc = (struct lov_desc *)data->ioc_inlbuf1;
3258 desc->ld_tgt_count = 1;
3259 desc->ld_active_tgt_count = 1;
3260 desc->ld_default_stripe_count = 1;
3261 desc->ld_default_stripe_size = 0;
3262 desc->ld_default_stripe_offset = 0;
3263 desc->ld_pattern = 0;
3264 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3266 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3268 err = copy_to_user((void *)uarg, buf, len);
3271 obd_ioctl_freedata(buf, len);
3274 case LL_IOC_LOV_SETSTRIPE:
3275 err = obd_alloc_memmd(exp, karg);
3279 case LL_IOC_LOV_GETSTRIPE:
3280 err = osc_getstripe(karg, uarg);
3282 case OBD_IOC_CLIENT_RECOVER:
3283 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3288 case IOC_OSC_SET_ACTIVE:
3289 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3292 case OBD_IOC_POLL_QUOTACHECK:
3293 err = lquota_poll_check(quota_interface, exp,
3294 (struct if_quotacheck *)karg);
3297 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3298 cmd, cfs_curproc_comm());
3299 GOTO(out, err = -ENOTTY);
3302 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3305 module_put(THIS_MODULE);
3310 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3311 void *key, __u32 *vallen, void *val)
3314 if (!vallen || !val)
3317 if (keylen > strlen("lock_to_stripe") &&
3318 strcmp(key, "lock_to_stripe") == 0) {
3319 __u32 *stripe = val;
3320 *vallen = sizeof(*stripe);
3323 } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3324 struct ptlrpc_request *req;
3326 char *bufs[2] = { NULL, key };
3327 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3329 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3330 OST_GET_INFO, 2, size, bufs);
3334 size[REPLY_REC_OFF] = *vallen;
3335 ptlrpc_req_set_repsize(req, 2, size);
3336 rc = ptlrpc_queue_wait(req);
3340 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3341 lustre_swab_ost_last_id);
3342 if (reply == NULL) {
3343 CERROR("Can't unpack OST last ID\n");
3344 GOTO(out, rc = -EPROTO);
3346 *((obd_id *)val) = *reply;
3348 ptlrpc_req_finished(req);
3354 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3357 struct llog_ctxt *ctxt;
3358 struct obd_import *imp = req->rq_import;
3364 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3367 rc = llog_initiator_connect(ctxt);
3369 CERROR("cannot establish connection for "
3370 "ctxt %p: %d\n", ctxt, rc);
3373 spin_lock(&imp->imp_lock);
3374 imp->imp_server_timeout = 1;
3375 imp->imp_pingable = 1;
3376 spin_unlock(&imp->imp_lock);
3377 CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3382 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3383 void *key, obd_count vallen, void *val,
3384 struct ptlrpc_request_set *set)
3386 struct ptlrpc_request *req;
3387 struct obd_device *obd = exp->exp_obd;
3388 struct obd_import *imp = class_exp2cliimp(exp);
3389 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3390 char *bufs[3] = { NULL, key, val };
3393 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3395 if (KEY_IS(KEY_NEXT_ID)) {
3396 if (vallen != sizeof(obd_id))
3398 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3399 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3400 exp->exp_obd->obd_name,
3401 obd->u.cli.cl_oscc.oscc_next_id);
3406 if (KEY_IS("unlinked")) {
3407 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3408 spin_lock(&oscc->oscc_lock);
3409 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3410 spin_unlock(&oscc->oscc_lock);
3414 if (KEY_IS(KEY_INIT_RECOV)) {
3415 if (vallen != sizeof(int))
3417 spin_lock(&imp->imp_lock);
3418 imp->imp_initial_recov = *(int *)val;
3419 spin_unlock(&imp->imp_lock);
3420 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3421 exp->exp_obd->obd_name,
3422 imp->imp_initial_recov);
3426 if (KEY_IS("checksum")) {
3427 if (vallen != sizeof(int))
3429 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3433 if (KEY_IS(KEY_FLUSH_CTX)) {
3434 sptlrpc_import_flush_my_ctx(imp);
3441 /* We pass all other commands directly to OST. Since nobody calls osc
3442 methods directly and everybody is supposed to go through LOV, we
3443 assume lov checked invalid values for us.
3444 The only recognised values so far are evict_by_nid and mds_conn.
3445 Even if something bad goes through, we'd get a -EINVAL from OST
3448 req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3453 if (KEY_IS("mds_conn")) {
3454 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3456 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3457 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3458 LASSERT(oscc->oscc_oa.o_gr > 0);
3459 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3462 ptlrpc_req_set_repsize(req, 1, NULL);
3463 ptlrpc_set_add_req(set, req);
3464 ptlrpc_check_set(set);
3470 static struct llog_operations osc_size_repl_logops = {
3471 lop_cancel: llog_obd_repl_cancel
3474 static struct llog_operations osc_mds_ost_orig_logops;
3475 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3476 struct obd_device *tgt, int count,
3477 struct llog_catid *catid, struct obd_uuid *uuid)
3482 spin_lock(&obd->obd_dev_lock);
3483 if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3484 osc_mds_ost_orig_logops = llog_lvfs_ops;
3485 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3486 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3487 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3488 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3490 spin_unlock(&obd->obd_dev_lock);
3492 rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3493 &catid->lci_logid, &osc_mds_ost_orig_logops);
3495 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3499 rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3500 &osc_size_repl_logops);
3502 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3505 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3506 obd->obd_name, tgt->obd_name, count, catid, rc);
3507 CERROR("logid "LPX64":0x%x\n",
3508 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3513 static int osc_llog_finish(struct obd_device *obd, int count)
3515 struct llog_ctxt *ctxt;
3516 int rc = 0, rc2 = 0;
3519 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3521 rc = llog_cleanup(ctxt);
3523 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3525 rc2 = llog_cleanup(ctxt);
3532 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3533 struct obd_uuid *cluuid,
3534 struct obd_connect_data *data)
3536 struct client_obd *cli = &obd->u.cli;
3538 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3541 client_obd_list_lock(&cli->cl_loi_list_lock);
3542 data->ocd_grant = cli->cl_avail_grant ?:
3543 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3544 lost_grant = cli->cl_lost_grant;
3545 cli->cl_lost_grant = 0;
3546 client_obd_list_unlock(&cli->cl_loi_list_lock);
3548 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3549 "cl_lost_grant: %ld\n", data->ocd_grant,
3550 cli->cl_avail_grant, lost_grant);
3551 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3552 " ocd_grant: %d\n", data->ocd_connect_flags,
3553 data->ocd_version, data->ocd_grant);
3559 static int osc_disconnect(struct obd_export *exp)
3561 struct obd_device *obd = class_exp2obd(exp);
3562 struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3565 if (obd->u.cli.cl_conn_count == 1)
3566 /* flush any remaining cancel messages out to the target */
3567 llog_sync(ctxt, exp);
3569 rc = client_disconnect_export(exp);
3573 static int osc_import_event(struct obd_device *obd,
3574 struct obd_import *imp,
3575 enum obd_import_event event)
3577 struct client_obd *cli;
3581 LASSERT(imp->imp_obd == obd);
3584 case IMP_EVENT_DISCON: {
3585 /* Only do this on the MDS OSC's */
3586 if (imp->imp_server_timeout) {
3587 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3589 spin_lock(&oscc->oscc_lock);
3590 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3591 spin_unlock(&oscc->oscc_lock);
3594 client_obd_list_lock(&cli->cl_loi_list_lock);
3595 cli->cl_avail_grant = 0;
3596 cli->cl_lost_grant = 0;
3597 client_obd_list_unlock(&cli->cl_loi_list_lock);
3600 case IMP_EVENT_INACTIVE: {
3601 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3604 case IMP_EVENT_INVALIDATE: {
3605 struct ldlm_namespace *ns = obd->obd_namespace;
3609 client_obd_list_lock(&cli->cl_loi_list_lock);
3610 /* all pages go to failing rpcs due to the invalid import */
3611 osc_check_rpcs(cli);
3612 client_obd_list_unlock(&cli->cl_loi_list_lock);
3614 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3618 case IMP_EVENT_ACTIVE: {
3619 /* Only do this on the MDS OSC's */
3620 if (imp->imp_server_timeout) {
3621 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3623 spin_lock(&oscc->oscc_lock);
3624 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3625 spin_unlock(&oscc->oscc_lock);
3627 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3630 case IMP_EVENT_OCD: {
3631 struct obd_connect_data *ocd = &imp->imp_connect_data;
3633 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3634 osc_init_grant(&obd->u.cli, ocd);
3637 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3638 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3640 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3644 CERROR("Unknown import event %d\n", event);
3650 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3656 rc = ptlrpcd_addref();
3660 rc = client_obd_setup(obd, lcfg);
3664 struct lprocfs_static_vars lvars;
3665 struct client_obd *cli = &obd->u.cli;
3667 lprocfs_init_vars(osc, &lvars);
3668 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3669 lproc_osc_attach_seqstat(obd);
3670 ptlrpc_lprocfs_register_obd(obd);
3674 /* We need to allocate a few requests more, because
3675 brw_interpret_oap tries to create new requests before freeing
3676 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3677 reserved, but I afraid that might be too much wasted RAM
3678 in fact, so 2 is just my guess and still should work. */
3679 cli->cl_import->imp_rq_pool =
3680 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3682 ptlrpc_add_rqs_to_pool);
3688 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3694 case OBD_CLEANUP_EARLY: {
3695 struct obd_import *imp;
3696 imp = obd->u.cli.cl_import;
3697 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3698 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3699 ptlrpc_deactivate_import(imp);
3700 spin_lock(&imp->imp_lock);
3701 imp->imp_pingable = 0;
3702 spin_unlock(&imp->imp_lock);
3705 case OBD_CLEANUP_EXPORTS: {
3706 /* If we set up but never connected, the
3707 client import will not have been cleaned. */
3708 if (obd->u.cli.cl_import) {
3709 struct obd_import *imp;
3710 imp = obd->u.cli.cl_import;
3711 CDEBUG(D_CONFIG, "%s: client import never connected\n",
3713 ptlrpc_invalidate_import(imp);
3714 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3715 class_destroy_import(imp);
3716 obd->u.cli.cl_import = NULL;
3720 case OBD_CLEANUP_SELF_EXP:
3721 rc = obd_llog_finish(obd, 0);
3723 CERROR("failed to cleanup llogging subsystems\n");
3725 case OBD_CLEANUP_OBD:
3731 int osc_cleanup(struct obd_device *obd)
3733 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3737 ptlrpc_lprocfs_unregister_obd(obd);
3738 lprocfs_obd_cleanup(obd);
3740 spin_lock(&oscc->oscc_lock);
3741 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3742 oscc->oscc_flags |= OSCC_FLAG_EXITING;
3743 spin_unlock(&oscc->oscc_lock);
3745 /* free memory of osc quota cache */
3746 lquota_cleanup(quota_interface, obd);
3748 rc = client_obd_cleanup(obd);
3754 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3756 struct lustre_cfg *lcfg = buf;
3757 struct lprocfs_static_vars lvars;
3760 lprocfs_init_vars(osc, &lvars);
3762 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3766 struct obd_ops osc_obd_ops = {
3767 .o_owner = THIS_MODULE,
3768 .o_setup = osc_setup,
3769 .o_precleanup = osc_precleanup,
3770 .o_cleanup = osc_cleanup,
3771 .o_add_conn = client_import_add_conn,
3772 .o_del_conn = client_import_del_conn,
3773 .o_connect = client_connect_import,
3774 .o_reconnect = osc_reconnect,
3775 .o_disconnect = osc_disconnect,
3776 .o_statfs = osc_statfs,
3777 .o_statfs_async = osc_statfs_async,
3778 .o_packmd = osc_packmd,
3779 .o_unpackmd = osc_unpackmd,
3780 .o_precreate = osc_precreate,
3781 .o_create = osc_create,
3782 .o_destroy = osc_destroy,
3783 .o_getattr = osc_getattr,
3784 .o_getattr_async = osc_getattr_async,
3785 .o_setattr = osc_setattr,
3786 .o_setattr_async = osc_setattr_async,
3788 .o_brw_async = osc_brw_async,
3789 .o_prep_async_page = osc_prep_async_page,
3790 .o_queue_async_io = osc_queue_async_io,
3791 .o_set_async_flags = osc_set_async_flags,
3792 .o_queue_group_io = osc_queue_group_io,
3793 .o_trigger_group_io = osc_trigger_group_io,
3794 .o_teardown_async_page = osc_teardown_async_page,
3795 .o_punch = osc_punch,
3797 .o_enqueue = osc_enqueue,
3798 .o_match = osc_match,
3799 .o_change_cbdata = osc_change_cbdata,
3800 .o_cancel = osc_cancel,
3801 .o_cancel_unused = osc_cancel_unused,
3802 .o_join_lru = osc_join_lru,
3803 .o_iocontrol = osc_iocontrol,
3804 .o_get_info = osc_get_info,
3805 .o_set_info_async = osc_set_info_async,
3806 .o_import_event = osc_import_event,
3807 .o_llog_init = osc_llog_init,
3808 .o_llog_finish = osc_llog_finish,
3809 .o_process_config = osc_process_config,
3811 int __init osc_init(void)
3813 struct lprocfs_static_vars lvars;
3817 lprocfs_init_vars(osc, &lvars);
3819 request_module("lquota");
3820 quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3821 lquota_init(quota_interface);
3822 init_obd_quota_ops(quota_interface, &osc_obd_ops);
3824 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3825 LUSTRE_OSC_NAME, NULL);
3827 if (quota_interface)
3828 PORTAL_SYMBOL_PUT(osc_quota_interface);
3836 static void /*__exit*/ osc_exit(void)
3838 lquota_exit(quota_interface);
3839 if (quota_interface)
3840 PORTAL_SYMBOL_PUT(osc_quota_interface);
3842 class_unregister_type(LUSTRE_OSC_NAME);
3845 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3846 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3847 MODULE_LICENSE("GPL");
3849 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);