4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
42 # include <liblustre.h>
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66 struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71 struct lov_stripe_md *lsm)
76 lmm_size = sizeof(**lmmp);
81 OBD_FREE(*lmmp, lmm_size);
87 OBD_ALLOC(*lmmp, lmm_size);
93 LASSERT(lsm->lsm_object_id);
94 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104 struct lov_mds_md *lmm, int lmm_bytes)
107 struct obd_import *imp = class_exp2cliimp(exp);
111 if (lmm_bytes < sizeof (*lmm)) {
112 CERROR("lov_mds_md too small: %d, need %d\n",
113 lmm_bytes, (int)sizeof(*lmm));
116 /* XXX LOV_MAGIC etc check? */
118 if (lmm->lmm_object_id == 0) {
119 CERROR("lov_mds_md: zero lmm_object_id\n");
124 lsm_size = lov_stripe_md_size(1);
128 if (*lsmp != NULL && lmm == NULL) {
129 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130 OBD_FREE(*lsmp, lsm_size);
136 OBD_ALLOC(*lsmp, lsm_size);
139 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141 OBD_FREE(*lsmp, lsm_size);
144 loi_init((*lsmp)->lsm_oinfo[0]);
148 /* XXX zero *lsmp? */
149 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151 LASSERT((*lsmp)->lsm_object_id);
152 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
156 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
159 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165 struct ost_body *body, void *capa)
167 struct obd_capa *oc = (struct obd_capa *)capa;
168 struct lustre_capa *c;
173 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
176 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177 DEBUG_CAPA(D_SEC, c, "pack");
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181 struct obd_info *oinfo)
183 struct ost_body *body;
185 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
188 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189 osc_pack_capa(req, body, oinfo->oi_capa);
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193 const struct req_msg_field *field,
197 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199 /* it is already calculated as sizeof struct obd_capa */
203 static int osc_getattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_async_args *aa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218 /* This should really be sent by the OST */
219 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222 CDEBUG(D_INFO, "can't unpack ost_body\n");
224 aa->aa_oi->oi_oa->o_valid = 0;
227 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232 struct ptlrpc_request_set *set)
234 struct ptlrpc_request *req;
235 struct osc_async_args *aa;
239 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246 ptlrpc_request_free(req);
250 osc_pack_req_body(req, oinfo);
252 ptlrpc_request_set_replen(req);
253 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256 aa = ptlrpc_req_async_args(req);
259 ptlrpc_set_add_req(set, req);
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264 struct obd_info *oinfo)
266 struct ptlrpc_request *req;
267 struct ost_body *body;
271 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278 ptlrpc_request_free(req);
282 osc_pack_req_body(req, oinfo);
284 ptlrpc_request_set_replen(req);
286 rc = ptlrpc_queue_wait(req);
290 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292 GOTO(out, rc = -EPROTO);
294 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
297 /* This should really be sent by the OST */
298 oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303 ptlrpc_req_finished(req);
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308 struct obd_info *oinfo, struct obd_trans_info *oti)
310 struct ptlrpc_request *req;
311 struct ost_body *body;
315 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324 ptlrpc_request_free(req);
328 osc_pack_req_body(req, oinfo);
330 ptlrpc_request_set_replen(req);
332 rc = ptlrpc_queue_wait(req);
336 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338 GOTO(out, rc = -EPROTO);
340 lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344 ptlrpc_req_finished(req);
348 static int osc_setattr_interpret(const struct lu_env *env,
349 struct ptlrpc_request *req,
350 struct osc_setattr_args *sa, int rc)
352 struct ost_body *body;
358 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360 GOTO(out, rc = -EPROTO);
362 lustre_get_wire_obdo(sa->sa_oa, &body->oa);
364 rc = sa->sa_upcall(sa->sa_cookie, rc);
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369 struct obd_trans_info *oti,
370 obd_enqueue_update_f upcall, void *cookie,
371 struct ptlrpc_request_set *rqset)
373 struct ptlrpc_request *req;
374 struct osc_setattr_args *sa;
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385 ptlrpc_request_free(req);
389 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
392 osc_pack_req_body(req, oinfo);
394 ptlrpc_request_set_replen(req);
396 /* do mds to ost setattr asynchronously */
398 /* Do not wait for response. */
399 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
401 req->rq_interpret_reply =
402 (ptlrpc_interpterer_t)osc_setattr_interpret;
404 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405 sa = ptlrpc_req_async_args(req);
406 sa->sa_oa = oinfo->oi_oa;
407 sa->sa_upcall = upcall;
408 sa->sa_cookie = cookie;
410 if (rqset == PTLRPCD_SET)
411 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413 ptlrpc_set_add_req(rqset, req);
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420 struct obd_trans_info *oti,
421 struct ptlrpc_request_set *rqset)
423 return osc_setattr_async_base(exp, oinfo, oti,
424 oinfo->oi_cb_up, oinfo, rqset);
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428 struct lov_stripe_md **ea, struct obd_trans_info *oti)
430 struct ptlrpc_request *req;
431 struct ost_body *body;
432 struct lov_stripe_md *lsm;
441 rc = obd_alloc_memmd(exp, &lsm);
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
448 GOTO(out, rc = -ENOMEM);
450 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
452 ptlrpc_request_free(req);
456 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
458 lustre_set_wire_obdo(&body->oa, oa);
460 ptlrpc_request_set_replen(req);
462 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463 oa->o_flags == OBD_FL_DELORPHAN) {
465 "delorphan from OST integration");
466 /* Don't resend the delorphan req */
467 req->rq_no_resend = req->rq_no_delay = 1;
470 rc = ptlrpc_queue_wait(req);
474 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
476 GOTO(out_req, rc = -EPROTO);
478 lustre_get_wire_obdo(oa, &body->oa);
480 /* This should really be sent by the OST */
481 oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482 oa->o_valid |= OBD_MD_FLBLKSZ;
484 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485 * have valid lsm_oinfo data structs, so don't go touching that.
486 * This needs to be fixed in a big way.
488 lsm->lsm_object_id = oa->o_id;
489 lsm->lsm_object_seq = oa->o_seq;
493 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
495 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496 if (!oti->oti_logcookies)
497 oti_alloc_cookies(oti, 1);
498 *oti->oti_logcookies = oa->o_lcookie;
502 CDEBUG(D_HA, "transno: "LPD64"\n",
503 lustre_msg_get_transno(req->rq_repmsg));
505 ptlrpc_req_finished(req);
508 obd_free_memmd(exp, &lsm);
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513 obd_enqueue_update_f upcall, void *cookie,
514 struct ptlrpc_request_set *rqset)
516 struct ptlrpc_request *req;
517 struct osc_setattr_args *sa;
518 struct ost_body *body;
522 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
529 ptlrpc_request_free(req);
532 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533 ptlrpc_at_set_req_timeout(req);
535 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
537 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538 osc_pack_capa(req, body, oinfo->oi_capa);
540 ptlrpc_request_set_replen(req);
542 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544 sa = ptlrpc_req_async_args(req);
545 sa->sa_oa = oinfo->oi_oa;
546 sa->sa_upcall = upcall;
547 sa->sa_cookie = cookie;
548 if (rqset == PTLRPCD_SET)
549 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
551 ptlrpc_set_add_req(rqset, req);
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557 struct obd_info *oinfo, struct obd_trans_info *oti,
558 struct ptlrpc_request_set *rqset)
560 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
561 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563 return osc_punch_base(exp, oinfo,
564 oinfo->oi_cb_up, oinfo, rqset);
567 static int osc_sync_interpret(const struct lu_env *env,
568 struct ptlrpc_request *req,
571 struct osc_async_args *aa = arg;
572 struct ost_body *body;
578 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580 CERROR ("can't unpack ost_body\n");
581 GOTO(out, rc = -EPROTO);
584 *aa->aa_oi->oi_oa = body->oa;
586 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
590 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
591 struct obd_info *oinfo, obd_size start, obd_size end,
592 struct ptlrpc_request_set *set)
594 struct ptlrpc_request *req;
595 struct ost_body *body;
596 struct osc_async_args *aa;
601 CDEBUG(D_INFO, "oa NULL\n");
605 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
609 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
610 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
612 ptlrpc_request_free(req);
616 /* overload the size and blocks fields in the oa with start/end */
617 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
619 lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
620 body->oa.o_size = start;
621 body->oa.o_blocks = end;
622 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
623 osc_pack_capa(req, body, oinfo->oi_capa);
625 ptlrpc_request_set_replen(req);
626 req->rq_interpret_reply = osc_sync_interpret;
628 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
629 aa = ptlrpc_req_async_args(req);
632 ptlrpc_set_add_req(set, req);
636 /* Find and cancel locally locks matched by @mode in the resource found by
637 * @objid. Found locks are added into @cancel list. Returns the amount of
638 * locks added to @cancels list. */
639 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
641 ldlm_mode_t mode, int lock_flags)
643 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
644 struct ldlm_res_id res_id;
645 struct ldlm_resource *res;
649 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
650 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
654 LDLM_RESOURCE_ADDREF(res);
655 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
656 lock_flags, 0, NULL);
657 LDLM_RESOURCE_DELREF(res);
658 ldlm_resource_putref(res);
662 static int osc_destroy_interpret(const struct lu_env *env,
663 struct ptlrpc_request *req, void *data,
666 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
668 cfs_atomic_dec(&cli->cl_destroy_in_flight);
669 cfs_waitq_signal(&cli->cl_destroy_waitq);
673 static int osc_can_send_destroy(struct client_obd *cli)
675 if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
676 cli->cl_max_rpcs_in_flight) {
677 /* The destroy request can be sent */
680 if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
681 cli->cl_max_rpcs_in_flight) {
683 * The counter has been modified between the two atomic
686 cfs_waitq_signal(&cli->cl_destroy_waitq);
691 /* Destroy requests can be async always on the client, and we don't even really
692 * care about the return code since the client cannot do anything at all about
694 * When the MDS is unlinking a filename, it saves the file objects into a
695 * recovery llog, and these object records are cancelled when the OST reports
696 * they were destroyed and sync'd to disk (i.e. transaction committed).
697 * If the client dies, or the OST is down when the object should be destroyed,
698 * the records are not cancelled, and when the OST reconnects to the MDS next,
699 * it will retrieve the llog unlink logs and then sends the log cancellation
700 * cookies to the MDS after committing destroy transactions. */
701 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
702 struct obdo *oa, struct lov_stripe_md *ea,
703 struct obd_trans_info *oti, struct obd_export *md_export,
706 struct client_obd *cli = &exp->exp_obd->u.cli;
707 struct ptlrpc_request *req;
708 struct ost_body *body;
709 CFS_LIST_HEAD(cancels);
714 CDEBUG(D_INFO, "oa NULL\n");
718 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
719 LDLM_FL_DISCARD_DATA);
721 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
723 ldlm_lock_list_put(&cancels, l_bl_ast, count);
727 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
728 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
731 ptlrpc_request_free(req);
735 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
736 ptlrpc_at_set_req_timeout(req);
738 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
739 oa->o_lcookie = *oti->oti_logcookies;
740 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
742 lustre_set_wire_obdo(&body->oa, oa);
744 osc_pack_capa(req, body, (struct obd_capa *)capa);
745 ptlrpc_request_set_replen(req);
747 /* don't throttle destroy RPCs for the MDT */
748 if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
749 req->rq_interpret_reply = osc_destroy_interpret;
750 if (!osc_can_send_destroy(cli)) {
751 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
755 * Wait until the number of on-going destroy RPCs drops
756 * under max_rpc_in_flight
758 l_wait_event_exclusive(cli->cl_destroy_waitq,
759 osc_can_send_destroy(cli), &lwi);
763 /* Do not wait for response */
764 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
768 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
771 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
773 LASSERT(!(oa->o_valid & bits));
776 client_obd_list_lock(&cli->cl_loi_list_lock);
777 oa->o_dirty = cli->cl_dirty;
778 if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
779 CERROR("dirty %lu - %lu > dirty_max %lu\n",
780 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
782 } else if (cfs_atomic_read(&obd_dirty_pages) -
783 cfs_atomic_read(&obd_dirty_transit_pages) >
784 obd_max_dirty_pages + 1){
785 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
786 * not covered by a lock thus they may safely race and trip
787 * this CERROR() unless we add in a small fudge factor (+1). */
788 CERROR("dirty %d - %d > system dirty_max %d\n",
789 cfs_atomic_read(&obd_dirty_pages),
790 cfs_atomic_read(&obd_dirty_transit_pages),
791 obd_max_dirty_pages);
793 } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
794 CERROR("dirty %lu - dirty_max %lu too big???\n",
795 cli->cl_dirty, cli->cl_dirty_max);
798 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
799 (cli->cl_max_rpcs_in_flight + 1);
800 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
802 oa->o_grant = cli->cl_avail_grant;
803 oa->o_dropped = cli->cl_lost_grant;
804 cli->cl_lost_grant = 0;
805 client_obd_list_unlock(&cli->cl_loi_list_lock);
806 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
807 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
811 void osc_update_next_shrink(struct client_obd *cli)
813 cli->cl_next_shrink_grant =
814 cfs_time_shift(cli->cl_grant_shrink_interval);
815 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
816 cli->cl_next_shrink_grant);
819 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
821 client_obd_list_lock(&cli->cl_loi_list_lock);
822 cli->cl_avail_grant += grant;
823 client_obd_list_unlock(&cli->cl_loi_list_lock);
826 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
828 if (body->oa.o_valid & OBD_MD_FLGRANT) {
829 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
830 __osc_update_grant(cli, body->oa.o_grant);
834 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
835 obd_count keylen, void *key, obd_count vallen,
836 void *val, struct ptlrpc_request_set *set);
838 static int osc_shrink_grant_interpret(const struct lu_env *env,
839 struct ptlrpc_request *req,
842 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
843 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
844 struct ost_body *body;
847 __osc_update_grant(cli, oa->o_grant);
851 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
853 osc_update_grant(cli, body);
859 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
861 client_obd_list_lock(&cli->cl_loi_list_lock);
862 oa->o_grant = cli->cl_avail_grant / 4;
863 cli->cl_avail_grant -= oa->o_grant;
864 client_obd_list_unlock(&cli->cl_loi_list_lock);
865 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
866 oa->o_valid |= OBD_MD_FLFLAGS;
869 oa->o_flags |= OBD_FL_SHRINK_GRANT;
870 osc_update_next_shrink(cli);
873 /* Shrink the current grant, either from some large amount to enough for a
874 * full set of in-flight RPCs, or if we have already shrunk to that limit
875 * then to enough for a single RPC. This avoids keeping more grant than
876 * needed, and avoids shrinking the grant piecemeal. */
877 static int osc_shrink_grant(struct client_obd *cli)
879 long target = (cli->cl_max_rpcs_in_flight + 1) *
880 cli->cl_max_pages_per_rpc;
882 client_obd_list_lock(&cli->cl_loi_list_lock);
883 if (cli->cl_avail_grant <= target)
884 target = cli->cl_max_pages_per_rpc;
885 client_obd_list_unlock(&cli->cl_loi_list_lock);
887 return osc_shrink_grant_to_target(cli, target);
890 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
893 struct ost_body *body;
896 client_obd_list_lock(&cli->cl_loi_list_lock);
897 /* Don't shrink if we are already above or below the desired limit
898 * We don't want to shrink below a single RPC, as that will negatively
899 * impact block allocation and long-term performance. */
900 if (target < cli->cl_max_pages_per_rpc)
901 target = cli->cl_max_pages_per_rpc;
903 if (target >= cli->cl_avail_grant) {
904 client_obd_list_unlock(&cli->cl_loi_list_lock);
907 client_obd_list_unlock(&cli->cl_loi_list_lock);
913 osc_announce_cached(cli, &body->oa, 0);
915 client_obd_list_lock(&cli->cl_loi_list_lock);
916 body->oa.o_grant = cli->cl_avail_grant - target;
917 cli->cl_avail_grant = target;
918 client_obd_list_unlock(&cli->cl_loi_list_lock);
919 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
920 body->oa.o_valid |= OBD_MD_FLFLAGS;
921 body->oa.o_flags = 0;
923 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
924 osc_update_next_shrink(cli);
926 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
927 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
928 sizeof(*body), body, NULL);
930 __osc_update_grant(cli, body->oa.o_grant);
935 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
936 static int osc_should_shrink_grant(struct client_obd *client)
938 cfs_time_t time = cfs_time_current();
939 cfs_time_t next_shrink = client->cl_next_shrink_grant;
941 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
942 OBD_CONNECT_GRANT_SHRINK) == 0)
945 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
946 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
947 client->cl_avail_grant > GRANT_SHRINK_LIMIT)
950 osc_update_next_shrink(client);
955 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
957 struct client_obd *client;
959 cfs_list_for_each_entry(client, &item->ti_obd_list,
960 cl_grant_shrink_list) {
961 if (osc_should_shrink_grant(client))
962 osc_shrink_grant(client);
967 static int osc_add_shrink_grant(struct client_obd *client)
971 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
973 osc_grant_shrink_grant_cb, NULL,
974 &client->cl_grant_shrink_list);
976 CERROR("add grant client %s error %d\n",
977 client->cl_import->imp_obd->obd_name, rc);
980 CDEBUG(D_CACHE, "add grant client %s \n",
981 client->cl_import->imp_obd->obd_name);
982 osc_update_next_shrink(client);
986 static int osc_del_shrink_grant(struct client_obd *client)
988 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
992 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
995 * ocd_grant is the total grant amount we're expect to hold: if we've
996 * been evicted, it's the new avail_grant amount, cl_dirty will drop
997 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
999 * race is tolerable here: if we're evicted, but imp_state already
1000 * left EVICTED state, then cl_dirty must be 0 already.
1002 client_obd_list_lock(&cli->cl_loi_list_lock);
1003 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1004 cli->cl_avail_grant = ocd->ocd_grant;
1006 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1008 if (cli->cl_avail_grant < 0) {
1009 CWARN("%s: available grant < 0, the OSS is probably not running"
1010 " with patch from bug20278 (%ld) \n",
1011 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1012 /* workaround for 1.6 servers which do not have
1013 * the patch from bug20278 */
1014 cli->cl_avail_grant = ocd->ocd_grant;
1017 client_obd_list_unlock(&cli->cl_loi_list_lock);
1019 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1020 cli->cl_import->imp_obd->obd_name,
1021 cli->cl_avail_grant, cli->cl_lost_grant);
1023 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1024 cfs_list_empty(&cli->cl_grant_shrink_list))
1025 osc_add_shrink_grant(cli);
1028 /* We assume that the reason this OSC got a short read is because it read
1029 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1030 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1031 * this stripe never got written at or beyond this stripe offset yet. */
1032 static void handle_short_read(int nob_read, obd_count page_count,
1033 struct brw_page **pga)
1038 /* skip bytes read OK */
1039 while (nob_read > 0) {
1040 LASSERT (page_count > 0);
1042 if (pga[i]->count > nob_read) {
1043 /* EOF inside this page */
1044 ptr = cfs_kmap(pga[i]->pg) +
1045 (pga[i]->off & ~CFS_PAGE_MASK);
1046 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1047 cfs_kunmap(pga[i]->pg);
1053 nob_read -= pga[i]->count;
1058 /* zero remaining pages */
1059 while (page_count-- > 0) {
1060 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1061 memset(ptr, 0, pga[i]->count);
1062 cfs_kunmap(pga[i]->pg);
1067 static int check_write_rcs(struct ptlrpc_request *req,
1068 int requested_nob, int niocount,
1069 obd_count page_count, struct brw_page **pga)
1074 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1075 sizeof(*remote_rcs) *
1077 if (remote_rcs == NULL) {
1078 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1082 /* return error if any niobuf was in error */
1083 for (i = 0; i < niocount; i++) {
1084 if ((int)remote_rcs[i] < 0)
1085 return(remote_rcs[i]);
1087 if (remote_rcs[i] != 0) {
1088 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1089 i, remote_rcs[i], req);
1094 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1095 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1096 req->rq_bulk->bd_nob_transferred, requested_nob);
1103 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1105 if (p1->flag != p2->flag) {
1106 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1107 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1109 /* warn if we try to combine flags that we don't know to be
1110 * safe to combine */
1111 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1112 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1113 "report this at http://bugs.whamcloud.com/\n",
1114 p1->flag, p2->flag);
1119 return (p1->off + p1->count == p2->off);
1122 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1123 struct brw_page **pga, int opc,
1124 cksum_type_t cksum_type)
1129 LASSERT (pg_count > 0);
1130 cksum = init_checksum(cksum_type);
1131 while (nob > 0 && pg_count > 0) {
1132 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1133 int off = pga[i]->off & ~CFS_PAGE_MASK;
1134 int count = pga[i]->count > nob ? nob : pga[i]->count;
1136 /* corrupt the data before we compute the checksum, to
1137 * simulate an OST->client data error */
1138 if (i == 0 && opc == OST_READ &&
1139 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1140 memcpy(ptr + off, "bad1", min(4, nob));
1141 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1142 cfs_kunmap(pga[i]->pg);
1143 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1146 nob -= pga[i]->count;
1150 /* For sending we only compute the wrong checksum instead
1151 * of corrupting the data so it is still correct on a redo */
1152 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1155 return fini_checksum(cksum, cksum_type);
1158 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1159 struct lov_stripe_md *lsm, obd_count page_count,
1160 struct brw_page **pga,
1161 struct ptlrpc_request **reqp,
1162 struct obd_capa *ocapa, int reserve,
1165 struct ptlrpc_request *req;
1166 struct ptlrpc_bulk_desc *desc;
1167 struct ost_body *body;
1168 struct obd_ioobj *ioobj;
1169 struct niobuf_remote *niobuf;
1170 int niocount, i, requested_nob, opc, rc;
1171 struct osc_brw_async_args *aa;
1172 struct req_capsule *pill;
1173 struct brw_page *pg_prev;
1176 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1177 RETURN(-ENOMEM); /* Recoverable */
1178 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1179 RETURN(-EINVAL); /* Fatal */
1181 if ((cmd & OBD_BRW_WRITE) != 0) {
1183 req = ptlrpc_request_alloc_pool(cli->cl_import,
1184 cli->cl_import->imp_rq_pool,
1185 &RQF_OST_BRW_WRITE);
1188 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1193 for (niocount = i = 1; i < page_count; i++) {
1194 if (!can_merge_pages(pga[i - 1], pga[i]))
1198 pill = &req->rq_pill;
1199 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1201 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1202 niocount * sizeof(*niobuf));
1203 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1205 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1207 ptlrpc_request_free(req);
1210 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1211 ptlrpc_at_set_req_timeout(req);
1213 if (opc == OST_WRITE)
1214 desc = ptlrpc_prep_bulk_imp(req, page_count,
1215 BULK_GET_SOURCE, OST_BULK_PORTAL);
1217 desc = ptlrpc_prep_bulk_imp(req, page_count,
1218 BULK_PUT_SINK, OST_BULK_PORTAL);
1221 GOTO(out, rc = -ENOMEM);
1222 /* NB request now owns desc and will free it when it gets freed */
1224 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1225 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1226 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1227 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1229 lustre_set_wire_obdo(&body->oa, oa);
1231 obdo_to_ioobj(oa, ioobj);
1232 ioobj->ioo_bufcnt = niocount;
1233 osc_pack_capa(req, body, ocapa);
1234 LASSERT (page_count > 0);
1236 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1237 struct brw_page *pg = pga[i];
1238 int poff = pg->off & ~CFS_PAGE_MASK;
1240 LASSERT(pg->count > 0);
1241 /* make sure there is no gap in the middle of page array */
1242 LASSERTF(page_count == 1 ||
1243 (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1244 ergo(i > 0 && i < page_count - 1,
1245 poff == 0 && pg->count == CFS_PAGE_SIZE) &&
1246 ergo(i == page_count - 1, poff == 0)),
1247 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1248 i, page_count, pg, pg->off, pg->count);
1250 LASSERTF(i == 0 || pg->off > pg_prev->off,
1251 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1252 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1254 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1255 pg_prev->pg, page_private(pg_prev->pg),
1256 pg_prev->pg->index, pg_prev->off);
1258 LASSERTF(i == 0 || pg->off > pg_prev->off,
1259 "i %d p_c %u\n", i, page_count);
1261 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1262 (pg->flag & OBD_BRW_SRVLOCK));
1264 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1265 requested_nob += pg->count;
1267 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1269 niobuf->len += pg->count;
1271 niobuf->offset = pg->off;
1272 niobuf->len = pg->count;
1273 niobuf->flags = pg->flag;
1278 LASSERTF((void *)(niobuf - niocount) ==
1279 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1280 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1281 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1283 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1285 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1286 body->oa.o_valid |= OBD_MD_FLFLAGS;
1287 body->oa.o_flags = 0;
1289 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1292 if (osc_should_shrink_grant(cli))
1293 osc_shrink_grant_local(cli, &body->oa);
1295 /* size[REQ_REC_OFF] still sizeof (*body) */
1296 if (opc == OST_WRITE) {
1297 if (cli->cl_checksum &&
1298 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1299 /* store cl_cksum_type in a local variable since
1300 * it can be changed via lprocfs */
1301 cksum_type_t cksum_type = cli->cl_cksum_type;
1303 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1304 oa->o_flags &= OBD_FL_LOCAL_MASK;
1305 body->oa.o_flags = 0;
1307 body->oa.o_flags |= cksum_type_pack(cksum_type);
1308 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1309 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1313 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1315 /* save this in 'oa', too, for later checking */
1316 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1317 oa->o_flags |= cksum_type_pack(cksum_type);
1319 /* clear out the checksum flag, in case this is a
1320 * resend but cl_checksum is no longer set. b=11238 */
1321 oa->o_valid &= ~OBD_MD_FLCKSUM;
1323 oa->o_cksum = body->oa.o_cksum;
1324 /* 1 RC per niobuf */
1325 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1326 sizeof(__u32) * niocount);
1328 if (cli->cl_checksum &&
1329 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1330 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1331 body->oa.o_flags = 0;
1332 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1333 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1336 ptlrpc_request_set_replen(req);
1338 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1339 aa = ptlrpc_req_async_args(req);
1341 aa->aa_requested_nob = requested_nob;
1342 aa->aa_nio_count = niocount;
1343 aa->aa_page_count = page_count;
1347 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1348 if (ocapa && reserve)
1349 aa->aa_ocapa = capa_get(ocapa);
1355 ptlrpc_req_finished(req);
1359 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1360 __u32 client_cksum, __u32 server_cksum, int nob,
1361 obd_count page_count, struct brw_page **pga,
1362 cksum_type_t client_cksum_type)
1366 cksum_type_t cksum_type;
1368 if (server_cksum == client_cksum) {
1369 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1373 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1375 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1378 if (cksum_type != client_cksum_type)
1379 msg = "the server did not use the checksum type specified in "
1380 "the original request - likely a protocol problem";
1381 else if (new_cksum == server_cksum)
1382 msg = "changed on the client after we checksummed it - "
1383 "likely false positive due to mmap IO (bug 11742)";
1384 else if (new_cksum == client_cksum)
1385 msg = "changed in transit before arrival at OST";
1387 msg = "changed in transit AND doesn't match the original - "
1388 "likely false positive due to mmap IO (bug 11742)";
1390 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1391 " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1392 msg, libcfs_nid2str(peer->nid),
1393 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1394 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1395 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1397 oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1399 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1400 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1401 "client csum now %x\n", client_cksum, client_cksum_type,
1402 server_cksum, cksum_type, new_cksum);
1406 /* Note rc enters this function as number of bytes transferred */
1407 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1409 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1410 const lnet_process_id_t *peer =
1411 &req->rq_import->imp_connection->c_peer;
1412 struct client_obd *cli = aa->aa_cli;
1413 struct ost_body *body;
1414 __u32 client_cksum = 0;
1417 if (rc < 0 && rc != -EDQUOT) {
1418 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1422 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1423 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1425 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1429 /* set/clear over quota flag for a uid/gid */
1430 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1431 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1432 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1434 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1435 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1437 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1440 osc_update_grant(cli, body);
1445 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1446 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1448 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1450 CERROR("Unexpected +ve rc %d\n", rc);
1453 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1455 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1458 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1459 check_write_checksum(&body->oa, peer, client_cksum,
1460 body->oa.o_cksum, aa->aa_requested_nob,
1461 aa->aa_page_count, aa->aa_ppga,
1462 cksum_type_unpack(aa->aa_oa->o_flags)))
1465 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1466 aa->aa_page_count, aa->aa_ppga);
1470 /* The rest of this function executes only for OST_READs */
1472 /* if unwrap_bulk failed, return -EAGAIN to retry */
1473 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1475 GOTO(out, rc = -EAGAIN);
1477 if (rc > aa->aa_requested_nob) {
1478 CERROR("Unexpected rc %d (%d requested)\n", rc,
1479 aa->aa_requested_nob);
1483 if (rc != req->rq_bulk->bd_nob_transferred) {
1484 CERROR ("Unexpected rc %d (%d transferred)\n",
1485 rc, req->rq_bulk->bd_nob_transferred);
1489 if (rc < aa->aa_requested_nob)
1490 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1492 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1493 static int cksum_counter;
1494 __u32 server_cksum = body->oa.o_cksum;
1497 cksum_type_t cksum_type;
1499 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1500 body->oa.o_flags : 0);
1501 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1502 aa->aa_ppga, OST_READ,
1505 if (peer->nid == req->rq_bulk->bd_sender) {
1509 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1512 if (server_cksum == ~0 && rc > 0) {
1513 CERROR("Protocol error: server %s set the 'checksum' "
1514 "bit, but didn't send a checksum. Not fatal, "
1515 "but please notify on http://bugs.whamcloud.com/\n",
1516 libcfs_nid2str(peer->nid));
1517 } else if (server_cksum != client_cksum) {
1518 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1519 "%s%s%s inode "DFID" object "
1520 LPU64"/"LPU64" extent "
1521 "["LPU64"-"LPU64"]\n",
1522 req->rq_import->imp_obd->obd_name,
1523 libcfs_nid2str(peer->nid),
1525 body->oa.o_valid & OBD_MD_FLFID ?
1526 body->oa.o_parent_seq : (__u64)0,
1527 body->oa.o_valid & OBD_MD_FLFID ?
1528 body->oa.o_parent_oid : 0,
1529 body->oa.o_valid & OBD_MD_FLFID ?
1530 body->oa.o_parent_ver : 0,
1532 body->oa.o_valid & OBD_MD_FLGROUP ?
1533 body->oa.o_seq : (__u64)0,
1534 aa->aa_ppga[0]->off,
1535 aa->aa_ppga[aa->aa_page_count-1]->off +
1536 aa->aa_ppga[aa->aa_page_count-1]->count -
1538 CERROR("client %x, server %x, cksum_type %x\n",
1539 client_cksum, server_cksum, cksum_type);
1541 aa->aa_oa->o_cksum = client_cksum;
1545 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1548 } else if (unlikely(client_cksum)) {
1549 static int cksum_missed;
1552 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1553 CERROR("Checksum %u requested from %s but not sent\n",
1554 cksum_missed, libcfs_nid2str(peer->nid));
1560 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1565 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1566 struct lov_stripe_md *lsm,
1567 obd_count page_count, struct brw_page **pga,
1568 struct obd_capa *ocapa)
1570 struct ptlrpc_request *req;
1573 int generation, resends = 0;
1574 struct l_wait_info lwi;
1578 cfs_waitq_init(&waitq);
1579 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1582 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1583 page_count, pga, &req, ocapa, 0, resends);
1588 req->rq_generation_set = 1;
1589 req->rq_import_generation = generation;
1590 req->rq_sent = cfs_time_current_sec() + resends;
1593 rc = ptlrpc_queue_wait(req);
1595 if (rc == -ETIMEDOUT && req->rq_resend) {
1596 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1597 ptlrpc_req_finished(req);
1601 rc = osc_brw_fini_request(req, rc);
1603 ptlrpc_req_finished(req);
1604 /* When server return -EINPROGRESS, client should always retry
1605 * regardless of the number of times the bulk was resent already.*/
1606 if (osc_recoverable_error(rc)) {
1608 if (rc != -EINPROGRESS &&
1609 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1610 CERROR("%s: too many resend retries for object: "
1611 ""LPU64":"LPU64", rc = %d.\n",
1612 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1616 exp->exp_obd->u.cli.cl_import->imp_generation) {
1617 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1618 ""LPU64":"LPU64", rc = %d.\n",
1619 exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1623 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1625 l_wait_event(waitq, 0, &lwi);
1630 if (rc == -EAGAIN || rc == -EINPROGRESS)
1635 int osc_brw_redo_request(struct ptlrpc_request *request,
1636 struct osc_brw_async_args *aa)
1638 struct ptlrpc_request *new_req;
1639 struct ptlrpc_request_set *set = request->rq_set;
1640 struct osc_brw_async_args *new_aa;
1641 struct osc_async_page *oap;
1645 DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1647 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1648 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1649 aa->aa_cli, aa->aa_oa,
1650 NULL /* lsm unused by osc currently */,
1651 aa->aa_page_count, aa->aa_ppga,
1652 &new_req, aa->aa_ocapa, 0, 1);
1656 client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1658 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1659 if (oap->oap_request != NULL) {
1660 LASSERTF(request == oap->oap_request,
1661 "request %p != oap_request %p\n",
1662 request, oap->oap_request);
1663 if (oap->oap_interrupted) {
1664 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1665 ptlrpc_req_finished(new_req);
1670 /* New request takes over pga and oaps from old request.
1671 * Note that copying a list_head doesn't work, need to move it... */
1673 new_req->rq_interpret_reply = request->rq_interpret_reply;
1674 new_req->rq_async_args = request->rq_async_args;
1675 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1676 new_req->rq_generation_set = 1;
1677 new_req->rq_import_generation = request->rq_import_generation;
1679 new_aa = ptlrpc_req_async_args(new_req);
1681 CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1682 cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1683 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1685 cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1686 if (oap->oap_request) {
1687 ptlrpc_req_finished(oap->oap_request);
1688 oap->oap_request = ptlrpc_request_addref(new_req);
1692 new_aa->aa_ocapa = aa->aa_ocapa;
1693 aa->aa_ocapa = NULL;
1695 /* use ptlrpc_set_add_req is safe because interpret functions work
1696 * in check_set context. only one way exist with access to request
1697 * from different thread got -EINTR - this way protected with
1698 * cl_loi_list_lock */
1699 ptlrpc_set_add_req(set, new_req);
1701 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1703 DEBUG_REQ(D_INFO, new_req, "new request");
1708 * ugh, we want disk allocation on the target to happen in offset order. we'll
1709 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1710 * fine for our small page arrays and doesn't require allocation. its an
1711 * insertion sort that swaps elements that are strides apart, shrinking the
1712 * stride down until its '1' and the array is sorted.
1714 static void sort_brw_pages(struct brw_page **array, int num)
1717 struct brw_page *tmp;
1721 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1726 for (i = stride ; i < num ; i++) {
1729 while (j >= stride && array[j - stride]->off > tmp->off) {
1730 array[j] = array[j - stride];
1735 } while (stride > 1);
1738 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1744 LASSERT (pages > 0);
1745 offset = pg[i]->off & ~CFS_PAGE_MASK;
1749 if (pages == 0) /* that's all */
1752 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1753 return count; /* doesn't end on page boundary */
1756 offset = pg[i]->off & ~CFS_PAGE_MASK;
1757 if (offset != 0) /* doesn't start on page boundary */
1764 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1766 struct brw_page **ppga;
1769 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1773 for (i = 0; i < count; i++)
1778 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1780 LASSERT(ppga != NULL);
1781 OBD_FREE(ppga, sizeof(*ppga) * count);
1784 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1785 obd_count page_count, struct brw_page *pga,
1786 struct obd_trans_info *oti)
1788 struct obdo *saved_oa = NULL;
1789 struct brw_page **ppga, **orig;
1790 struct obd_import *imp = class_exp2cliimp(exp);
1791 struct client_obd *cli;
1792 int rc, page_count_orig;
1795 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1796 cli = &imp->imp_obd->u.cli;
1798 if (cmd & OBD_BRW_CHECK) {
1799 /* The caller just wants to know if there's a chance that this
1800 * I/O can succeed */
1802 if (imp->imp_invalid)
1807 /* test_brw with a failed create can trip this, maybe others. */
1808 LASSERT(cli->cl_max_pages_per_rpc);
1812 orig = ppga = osc_build_ppga(pga, page_count);
1815 page_count_orig = page_count;
1817 sort_brw_pages(ppga, page_count);
1818 while (page_count) {
1819 obd_count pages_per_brw;
1821 if (page_count > cli->cl_max_pages_per_rpc)
1822 pages_per_brw = cli->cl_max_pages_per_rpc;
1824 pages_per_brw = page_count;
1826 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1828 if (saved_oa != NULL) {
1829 /* restore previously saved oa */
1830 *oinfo->oi_oa = *saved_oa;
1831 } else if (page_count > pages_per_brw) {
1832 /* save a copy of oa (brw will clobber it) */
1833 OBDO_ALLOC(saved_oa);
1834 if (saved_oa == NULL)
1835 GOTO(out, rc = -ENOMEM);
1836 *saved_oa = *oinfo->oi_oa;
1839 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1840 pages_per_brw, ppga, oinfo->oi_capa);
1845 page_count -= pages_per_brw;
1846 ppga += pages_per_brw;
1850 osc_release_ppga(orig, page_count_orig);
1852 if (saved_oa != NULL)
1853 OBDO_FREE(saved_oa);
1858 static int brw_interpret(const struct lu_env *env,
1859 struct ptlrpc_request *req, void *data, int rc)
1861 struct osc_brw_async_args *aa = data;
1862 struct osc_async_page *oap, *tmp;
1863 struct client_obd *cli;
1866 rc = osc_brw_fini_request(req, rc);
1867 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1868 /* When server return -EINPROGRESS, client should always retry
1869 * regardless of the number of times the bulk was resent already. */
1870 if (osc_recoverable_error(rc)) {
1871 if (req->rq_import_generation !=
1872 req->rq_import->imp_generation) {
1873 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1874 ""LPU64":"LPU64", rc = %d.\n",
1875 req->rq_import->imp_obd->obd_name,
1876 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1877 } else if (rc == -EINPROGRESS ||
1878 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1879 rc = osc_brw_redo_request(req, aa);
1881 CERROR("%s: too many resent retries for object: "
1882 ""LPU64":"LPU64", rc = %d.\n",
1883 req->rq_import->imp_obd->obd_name,
1884 aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1889 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1894 capa_put(aa->aa_ocapa);
1895 aa->aa_ocapa = NULL;
1899 client_obd_list_lock(&cli->cl_loi_list_lock);
1901 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1902 * is called so we know whether to go to sync BRWs or wait for more
1903 * RPCs to complete */
1904 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1905 cli->cl_w_in_flight--;
1907 cli->cl_r_in_flight--;
1909 /* the caller may re-use the oap after the completion call so
1910 * we need to clean it up a little */
1911 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
1913 cfs_list_del_init(&oap->oap_rpc_item);
1914 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1916 OBDO_FREE(aa->aa_oa);
1918 osc_wake_cache_waiters(cli);
1919 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1920 client_obd_list_unlock(&cli->cl_loi_list_lock);
1922 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1923 req->rq_bulk->bd_nob_transferred);
1924 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1925 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1930 /* The most tricky part of this function is that it will return with
1931 * cli->cli_loi_list_lock held.
1933 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1934 cfs_list_t *rpc_list, int page_count, int cmd,
1937 struct ptlrpc_request *req = NULL;
1938 struct brw_page **pga = NULL;
1939 struct osc_brw_async_args *aa = NULL;
1940 struct obdo *oa = NULL;
1941 struct osc_async_page *oap;
1942 struct osc_async_page *tmp;
1943 struct cl_req *clerq = NULL;
1944 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1945 struct ldlm_lock *lock = NULL;
1946 struct cl_req_attr crattr;
1947 int i, rc, mpflag = 0;
1950 LASSERT(!cfs_list_empty(rpc_list));
1952 if (cmd & OBD_BRW_MEMALLOC)
1953 mpflag = cfs_memory_pressure_get_and_set();
1955 memset(&crattr, 0, sizeof crattr);
1956 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1958 GOTO(out, rc = -ENOMEM);
1962 GOTO(out, rc = -ENOMEM);
1965 cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1966 struct cl_page *page = osc_oap2cl_page(oap);
1967 if (clerq == NULL) {
1968 clerq = cl_req_alloc(env, page, crt,
1969 1 /* only 1-object rpcs for
1972 GOTO(out, rc = PTR_ERR(clerq));
1973 lock = oap->oap_ldlm_lock;
1975 pga[i] = &oap->oap_brw_page;
1976 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1977 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1978 pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1980 cl_req_page_add(env, clerq, page);
1983 /* always get the data for the obdo for the rpc */
1984 LASSERT(clerq != NULL);
1986 crattr.cra_capa = NULL;
1987 memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
1988 cl_req_attr_set(env, clerq, &crattr, ~0ULL);
1990 oa->o_handle = lock->l_remote_handle;
1991 oa->o_valid |= OBD_MD_FLHANDLE;
1994 rc = cl_req_prep(env, clerq);
1996 CERROR("cl_req_prep failed: %d\n", rc);
2000 sort_brw_pages(pga, page_count);
2001 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2002 pga, &req, crattr.cra_capa, 1, 0);
2004 CERROR("prep_req failed: %d\n", rc);
2008 req->rq_interpret_reply = brw_interpret;
2009 if (cmd & OBD_BRW_MEMALLOC)
2010 req->rq_memalloc = 1;
2012 /* Need to update the timestamps after the request is built in case
2013 * we race with setattr (locally or in queue at OST). If OST gets
2014 * later setattr before earlier BRW (as determined by the request xid),
2015 * the OST will not use BRW timestamps. Sadly, there is no obvious
2016 * way to do this in a single call. bug 10150 */
2017 cl_req_attr_set(env, clerq, &crattr,
2018 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2020 lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2022 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2023 aa = ptlrpc_req_async_args(req);
2024 CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2025 cfs_list_splice(rpc_list, &aa->aa_oaps);
2026 CFS_INIT_LIST_HEAD(rpc_list);
2027 aa->aa_clerq = clerq;
2029 if (cmd & OBD_BRW_MEMALLOC)
2030 cfs_memory_pressure_restore(mpflag);
2032 capa_put(crattr.cra_capa);
2034 LASSERT(req == NULL);
2039 OBD_FREE(pga, sizeof(*pga) * page_count);
2040 /* this should happen rarely and is pretty bad, it makes the
2041 * pending list not follow the dirty order */
2042 client_obd_list_lock(&cli->cl_loi_list_lock);
2043 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2044 cfs_list_del_init(&oap->oap_rpc_item);
2046 /* queued sync pages can be torn down while the pages
2047 * were between the pending list and the rpc */
2048 if (oap->oap_interrupted) {
2049 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2050 osc_ap_completion(env, cli, NULL, oap, 0,
2054 osc_ap_completion(env, cli, NULL, oap, 0, rc);
2056 if (clerq && !IS_ERR(clerq))
2057 cl_req_completion(env, clerq, rc);
2059 struct osc_async_page *tmp = NULL;
2061 /* queued sync pages can be torn down while the pages
2062 * were between the pending list and the rpc */
2063 LASSERT(aa != NULL);
2064 client_obd_list_lock(&cli->cl_loi_list_lock);
2065 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2066 /* only one oap gets a request reference */
2069 if (oap->oap_interrupted && !req->rq_intr) {
2070 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2072 ptlrpc_mark_interrupted(req);
2076 tmp->oap_request = ptlrpc_request_addref(req);
2078 DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
2079 page_count, aa, cli->cl_r_in_flight,
2080 cli->cl_w_in_flight);
2082 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2083 * see which CPU/NUMA node the majority of pages were allocated
2084 * on, and try to assign the async RPC to the CPU core
2085 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2087 * But on the other hand, we expect that multiple ptlrpcd
2088 * threads and the initial write sponsor can run in parallel,
2089 * especially when data checksum is enabled, which is CPU-bound
2090 * operation and single ptlrpcd thread cannot process in time.
2091 * So more ptlrpcd threads sharing BRW load
2092 * (with PDL_POLICY_ROUND) seems better.
2094 ptlrpcd_add_req(req, pol, -1);
2099 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2100 struct ldlm_enqueue_info *einfo)
2102 void *data = einfo->ei_cbdata;
2105 LASSERT(lock != NULL);
2106 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2107 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2108 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2109 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2111 lock_res_and_lock(lock);
2112 cfs_spin_lock(&osc_ast_guard);
2114 if (lock->l_ast_data == NULL)
2115 lock->l_ast_data = data;
2116 if (lock->l_ast_data == data)
2119 cfs_spin_unlock(&osc_ast_guard);
2120 unlock_res_and_lock(lock);
2125 static int osc_set_data_with_check(struct lustre_handle *lockh,
2126 struct ldlm_enqueue_info *einfo)
2128 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2132 set = osc_set_lock_data_with_check(lock, einfo);
2133 LDLM_LOCK_PUT(lock);
2135 CERROR("lockh %p, data %p - client evicted?\n",
2136 lockh, einfo->ei_cbdata);
2140 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2141 ldlm_iterator_t replace, void *data)
2143 struct ldlm_res_id res_id;
2144 struct obd_device *obd = class_exp2obd(exp);
2146 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2147 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2151 /* find any ldlm lock of the inode in osc
2155 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2156 ldlm_iterator_t replace, void *data)
2158 struct ldlm_res_id res_id;
2159 struct obd_device *obd = class_exp2obd(exp);
2162 osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2163 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2164 if (rc == LDLM_ITER_STOP)
2166 if (rc == LDLM_ITER_CONTINUE)
2171 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2172 obd_enqueue_update_f upcall, void *cookie,
2173 int *flags, int agl, int rc)
2175 int intent = *flags & LDLM_FL_HAS_INTENT;
2179 /* The request was created before ldlm_cli_enqueue call. */
2180 if (rc == ELDLM_LOCK_ABORTED) {
2181 struct ldlm_reply *rep;
2182 rep = req_capsule_server_get(&req->rq_pill,
2185 LASSERT(rep != NULL);
2186 if (rep->lock_policy_res1)
2187 rc = rep->lock_policy_res1;
2191 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2193 *flags |= LDLM_FL_LVB_READY;
2194 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2195 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2198 /* Call the update callback. */
2199 rc = (*upcall)(cookie, rc);
2203 static int osc_enqueue_interpret(const struct lu_env *env,
2204 struct ptlrpc_request *req,
2205 struct osc_enqueue_args *aa, int rc)
2207 struct ldlm_lock *lock;
2208 struct lustre_handle handle;
2210 struct ost_lvb *lvb;
2212 int *flags = aa->oa_flags;
2214 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2215 * might be freed anytime after lock upcall has been called. */
2216 lustre_handle_copy(&handle, aa->oa_lockh);
2217 mode = aa->oa_ei->ei_mode;
2219 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2221 lock = ldlm_handle2lock(&handle);
2223 /* Take an additional reference so that a blocking AST that
2224 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2225 * to arrive after an upcall has been executed by
2226 * osc_enqueue_fini(). */
2227 ldlm_lock_addref(&handle, mode);
2229 /* Let CP AST to grant the lock first. */
2230 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2232 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2237 lvb_len = sizeof(*aa->oa_lvb);
2240 /* Complete obtaining the lock procedure. */
2241 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2242 mode, flags, lvb, lvb_len, &handle, rc);
2243 /* Complete osc stuff. */
2244 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2245 flags, aa->oa_agl, rc);
2247 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2249 /* Release the lock for async request. */
2250 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2252 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2253 * not already released by
2254 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2256 ldlm_lock_decref(&handle, mode);
2258 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2259 aa->oa_lockh, req, aa);
2260 ldlm_lock_decref(&handle, mode);
2261 LDLM_LOCK_PUT(lock);
2265 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2266 struct lov_oinfo *loi, int flags,
2267 struct ost_lvb *lvb, __u32 mode, int rc)
2269 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2271 if (rc == ELDLM_OK) {
2274 LASSERT(lock != NULL);
2275 loi->loi_lvb = *lvb;
2276 tmp = loi->loi_lvb.lvb_size;
2277 /* Extend KMS up to the end of this lock and no further
2278 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2279 if (tmp > lock->l_policy_data.l_extent.end)
2280 tmp = lock->l_policy_data.l_extent.end + 1;
2281 if (tmp >= loi->loi_kms) {
2282 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2283 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2284 loi_kms_set(loi, tmp);
2286 LDLM_DEBUG(lock, "lock acquired, setting rss="
2287 LPU64"; leaving kms="LPU64", end="LPU64,
2288 loi->loi_lvb.lvb_size, loi->loi_kms,
2289 lock->l_policy_data.l_extent.end);
2291 ldlm_lock_allow_match(lock);
2292 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2293 LASSERT(lock != NULL);
2294 loi->loi_lvb = *lvb;
2295 ldlm_lock_allow_match(lock);
2296 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2297 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2303 ldlm_lock_fail_match(lock);
2305 LDLM_LOCK_PUT(lock);
2308 EXPORT_SYMBOL(osc_update_enqueue);
2310 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2312 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2313 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2314 * other synchronous requests, however keeping some locks and trying to obtain
2315 * others may take a considerable amount of time in a case of ost failure; and
2316 * when other sync requests do not get released lock from a client, the client
2317 * is excluded from the cluster -- such scenarious make the life difficult, so
2318 * release locks just after they are obtained. */
2319 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2320 int *flags, ldlm_policy_data_t *policy,
2321 struct ost_lvb *lvb, int kms_valid,
2322 obd_enqueue_update_f upcall, void *cookie,
2323 struct ldlm_enqueue_info *einfo,
2324 struct lustre_handle *lockh,
2325 struct ptlrpc_request_set *rqset, int async, int agl)
2327 struct obd_device *obd = exp->exp_obd;
2328 struct ptlrpc_request *req = NULL;
2329 int intent = *flags & LDLM_FL_HAS_INTENT;
2330 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2335 /* Filesystem lock extents are extended to page boundaries so that
2336 * dealing with the page cache is a little smoother. */
2337 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2338 policy->l_extent.end |= ~CFS_PAGE_MASK;
2341 * kms is not valid when either object is completely fresh (so that no
2342 * locks are cached), or object was evicted. In the latter case cached
2343 * lock cannot be used, because it would prime inode state with
2344 * potentially stale LVB.
2349 /* Next, search for already existing extent locks that will cover us */
2350 /* If we're trying to read, we also search for an existing PW lock. The
2351 * VFS and page cache already protect us locally, so lots of readers/
2352 * writers can share a single PW lock.
2354 * There are problems with conversion deadlocks, so instead of
2355 * converting a read lock to a write lock, we'll just enqueue a new
2358 * At some point we should cancel the read lock instead of making them
2359 * send us a blocking callback, but there are problems with canceling
2360 * locks out from other users right now, too. */
2361 mode = einfo->ei_mode;
2362 if (einfo->ei_mode == LCK_PR)
2364 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2365 einfo->ei_type, policy, mode, lockh, 0);
2367 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2369 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2370 /* For AGL, if enqueue RPC is sent but the lock is not
2371 * granted, then skip to process this strpe.
2372 * Return -ECANCELED to tell the caller. */
2373 ldlm_lock_decref(lockh, mode);
2374 LDLM_LOCK_PUT(matched);
2376 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2377 *flags |= LDLM_FL_LVB_READY;
2378 /* addref the lock only if not async requests and PW
2379 * lock is matched whereas we asked for PR. */
2380 if (!rqset && einfo->ei_mode != mode)
2381 ldlm_lock_addref(lockh, LCK_PR);
2383 /* I would like to be able to ASSERT here that
2384 * rss <= kms, but I can't, for reasons which
2385 * are explained in lov_enqueue() */
2388 /* We already have a lock, and it's referenced */
2389 (*upcall)(cookie, ELDLM_OK);
2391 if (einfo->ei_mode != mode)
2392 ldlm_lock_decref(lockh, LCK_PW);
2394 /* For async requests, decref the lock. */
2395 ldlm_lock_decref(lockh, einfo->ei_mode);
2396 LDLM_LOCK_PUT(matched);
2399 ldlm_lock_decref(lockh, mode);
2400 LDLM_LOCK_PUT(matched);
2406 CFS_LIST_HEAD(cancels);
2407 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2408 &RQF_LDLM_ENQUEUE_LVB);
2412 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2414 ptlrpc_request_free(req);
2418 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2420 ptlrpc_request_set_replen(req);
2423 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2424 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2426 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2427 sizeof(*lvb), lockh, async);
2430 struct osc_enqueue_args *aa;
2431 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2432 aa = ptlrpc_req_async_args(req);
2435 aa->oa_flags = flags;
2436 aa->oa_upcall = upcall;
2437 aa->oa_cookie = cookie;
2439 aa->oa_lockh = lockh;
2442 req->rq_interpret_reply =
2443 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2444 if (rqset == PTLRPCD_SET)
2445 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2447 ptlrpc_set_add_req(rqset, req);
2448 } else if (intent) {
2449 ptlrpc_req_finished(req);
2454 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2456 ptlrpc_req_finished(req);
2461 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2462 struct ldlm_enqueue_info *einfo,
2463 struct ptlrpc_request_set *rqset)
2465 struct ldlm_res_id res_id;
2469 osc_build_res_name(oinfo->oi_md->lsm_object_id,
2470 oinfo->oi_md->lsm_object_seq, &res_id);
2472 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2473 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2474 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2475 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2476 rqset, rqset != NULL, 0);
2480 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2481 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2482 int *flags, void *data, struct lustre_handle *lockh,
2485 struct obd_device *obd = exp->exp_obd;
2486 int lflags = *flags;
2490 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2493 /* Filesystem lock extents are extended to page boundaries so that
2494 * dealing with the page cache is a little smoother */
2495 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2496 policy->l_extent.end |= ~CFS_PAGE_MASK;
2498 /* Next, search for already existing extent locks that will cover us */
2499 /* If we're trying to read, we also search for an existing PW lock. The
2500 * VFS and page cache already protect us locally, so lots of readers/
2501 * writers can share a single PW lock. */
2505 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2506 res_id, type, policy, rc, lockh, unref);
2509 if (!osc_set_data_with_check(lockh, data)) {
2510 if (!(lflags & LDLM_FL_TEST_LOCK))
2511 ldlm_lock_decref(lockh, rc);
2515 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2516 ldlm_lock_addref(lockh, LCK_PR);
2517 ldlm_lock_decref(lockh, LCK_PW);
2524 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2528 if (unlikely(mode == LCK_GROUP))
2529 ldlm_lock_decref_and_cancel(lockh, mode);
2531 ldlm_lock_decref(lockh, mode);
2536 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2537 __u32 mode, struct lustre_handle *lockh)
2540 RETURN(osc_cancel_base(lockh, mode));
2543 static int osc_cancel_unused(struct obd_export *exp,
2544 struct lov_stripe_md *lsm,
2545 ldlm_cancel_flags_t flags,
2548 struct obd_device *obd = class_exp2obd(exp);
2549 struct ldlm_res_id res_id, *resp = NULL;
2552 resp = osc_build_res_name(lsm->lsm_object_id,
2553 lsm->lsm_object_seq, &res_id);
2556 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2559 static int osc_statfs_interpret(const struct lu_env *env,
2560 struct ptlrpc_request *req,
2561 struct osc_async_args *aa, int rc)
2563 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
2564 struct obd_statfs *msfs;
2569 /* The request has in fact never been sent
2570 * due to issues at a higher level (LOV).
2571 * Exit immediately since the caller is
2572 * aware of the problem and takes care
2573 * of the clean up */
2576 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2577 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2583 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2585 GOTO(out, rc = -EPROTO);
2588 /* Reinitialize the RDONLY and DEGRADED flags at the client
2589 * on each statfs, so they don't stay set permanently. */
2590 cfs_spin_lock(&cli->cl_oscc.oscc_lock);
2592 if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
2593 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
2594 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
2595 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
2597 if (unlikely(msfs->os_state & OS_STATE_READONLY))
2598 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
2599 else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
2600 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
2602 /* Add a bit of hysteresis so this flag isn't continually flapping,
2603 * and ensure that new files don't get extremely fragmented due to
2604 * only a small amount of available space in the filesystem.
2605 * We want to set the NOSPC flag when there is less than ~0.1% free
2606 * and clear it when there is at least ~0.2% free space, so:
2607 * avail < ~0.1% max max = avail + used
2608 * 1025 * avail < avail + used used = blocks - free
2609 * 1024 * avail < used
2610 * 1024 * avail < blocks - free
2611 * avail < ((blocks - free) >> 10)
2613 * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
2614 * lose that amount of space so in those cases we report no space left
2615 * if their is less than 1 GB left. */
2616 used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
2617 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
2618 ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
2619 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
2620 else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2621 (msfs->os_ffree > 64) &&
2622 (msfs->os_bavail > (used << 1)))) {
2623 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
2624 OSCC_FLAG_NOSPC_BLK);
2627 if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2628 (msfs->os_bavail < used)))
2629 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
2631 cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
2633 *aa->aa_oi->oi_osfs = *msfs;
2635 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2639 static int osc_statfs_async(struct obd_export *exp,
2640 struct obd_info *oinfo, __u64 max_age,
2641 struct ptlrpc_request_set *rqset)
2643 struct obd_device *obd = class_exp2obd(exp);
2644 struct ptlrpc_request *req;
2645 struct osc_async_args *aa;
2649 /* We could possibly pass max_age in the request (as an absolute
2650 * timestamp or a "seconds.usec ago") so the target can avoid doing
2651 * extra calls into the filesystem if that isn't necessary (e.g.
2652 * during mount that would help a bit). Having relative timestamps
2653 * is not so great if request processing is slow, while absolute
2654 * timestamps are not ideal because they need time synchronization. */
2655 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2659 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2661 ptlrpc_request_free(req);
2664 ptlrpc_request_set_replen(req);
2665 req->rq_request_portal = OST_CREATE_PORTAL;
2666 ptlrpc_at_set_req_timeout(req);
2668 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2669 /* procfs requests not want stat in wait for avoid deadlock */
2670 req->rq_no_resend = 1;
2671 req->rq_no_delay = 1;
2674 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2675 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2676 aa = ptlrpc_req_async_args(req);
2679 ptlrpc_set_add_req(rqset, req);
2683 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2684 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2686 struct obd_device *obd = class_exp2obd(exp);
2687 struct obd_statfs *msfs;
2688 struct ptlrpc_request *req;
2689 struct obd_import *imp = NULL;
2693 /*Since the request might also come from lprocfs, so we need
2694 *sync this with client_disconnect_export Bug15684*/
2695 cfs_down_read(&obd->u.cli.cl_sem);
2696 if (obd->u.cli.cl_import)
2697 imp = class_import_get(obd->u.cli.cl_import);
2698 cfs_up_read(&obd->u.cli.cl_sem);
2702 /* We could possibly pass max_age in the request (as an absolute
2703 * timestamp or a "seconds.usec ago") so the target can avoid doing
2704 * extra calls into the filesystem if that isn't necessary (e.g.
2705 * during mount that would help a bit). Having relative timestamps
2706 * is not so great if request processing is slow, while absolute
2707 * timestamps are not ideal because they need time synchronization. */
2708 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2710 class_import_put(imp);
2715 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2717 ptlrpc_request_free(req);
2720 ptlrpc_request_set_replen(req);
2721 req->rq_request_portal = OST_CREATE_PORTAL;
2722 ptlrpc_at_set_req_timeout(req);
2724 if (flags & OBD_STATFS_NODELAY) {
2725 /* procfs requests not want stat in wait for avoid deadlock */
2726 req->rq_no_resend = 1;
2727 req->rq_no_delay = 1;
2730 rc = ptlrpc_queue_wait(req);
2734 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2736 GOTO(out, rc = -EPROTO);
2743 ptlrpc_req_finished(req);
2747 /* Retrieve object striping information.
2749 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2750 * the maximum number of OST indices which will fit in the user buffer.
2751 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2753 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2755 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2756 struct lov_user_md_v3 lum, *lumk;
2757 struct lov_user_ost_data_v1 *lmm_objects;
2758 int rc = 0, lum_size;
2764 /* we only need the header part from user space to get lmm_magic and
2765 * lmm_stripe_count, (the header part is common to v1 and v3) */
2766 lum_size = sizeof(struct lov_user_md_v1);
2767 if (cfs_copy_from_user(&lum, lump, lum_size))
2770 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2771 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2774 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2775 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2776 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2777 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2779 /* we can use lov_mds_md_size() to compute lum_size
2780 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2781 if (lum.lmm_stripe_count > 0) {
2782 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2783 OBD_ALLOC(lumk, lum_size);
2787 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2788 lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2790 lmm_objects = &(lumk->lmm_objects[0]);
2791 lmm_objects->l_object_id = lsm->lsm_object_id;
2793 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2797 lumk->lmm_object_id = lsm->lsm_object_id;
2798 lumk->lmm_object_seq = lsm->lsm_object_seq;
2799 lumk->lmm_stripe_count = 1;
2801 if (cfs_copy_to_user(lump, lumk, lum_size))
2805 OBD_FREE(lumk, lum_size);
2811 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2812 void *karg, void *uarg)
2814 struct obd_device *obd = exp->exp_obd;
2815 struct obd_ioctl_data *data = karg;
2819 if (!cfs_try_module_get(THIS_MODULE)) {
2820 CERROR("Can't get module. Is it alive?");
2824 case OBD_IOC_LOV_GET_CONFIG: {
2826 struct lov_desc *desc;
2827 struct obd_uuid uuid;
2831 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2832 GOTO(out, err = -EINVAL);
2834 data = (struct obd_ioctl_data *)buf;
2836 if (sizeof(*desc) > data->ioc_inllen1) {
2837 obd_ioctl_freedata(buf, len);
2838 GOTO(out, err = -EINVAL);
2841 if (data->ioc_inllen2 < sizeof(uuid)) {
2842 obd_ioctl_freedata(buf, len);
2843 GOTO(out, err = -EINVAL);
2846 desc = (struct lov_desc *)data->ioc_inlbuf1;
2847 desc->ld_tgt_count = 1;
2848 desc->ld_active_tgt_count = 1;
2849 desc->ld_default_stripe_count = 1;
2850 desc->ld_default_stripe_size = 0;
2851 desc->ld_default_stripe_offset = 0;
2852 desc->ld_pattern = 0;
2853 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2855 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2857 err = cfs_copy_to_user((void *)uarg, buf, len);
2860 obd_ioctl_freedata(buf, len);
2863 case LL_IOC_LOV_SETSTRIPE:
2864 err = obd_alloc_memmd(exp, karg);
2868 case LL_IOC_LOV_GETSTRIPE:
2869 err = osc_getstripe(karg, uarg);
2871 case OBD_IOC_CLIENT_RECOVER:
2872 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2873 data->ioc_inlbuf1, 0);
2877 case IOC_OSC_SET_ACTIVE:
2878 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2881 case OBD_IOC_POLL_QUOTACHECK:
2882 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2884 case OBD_IOC_PING_TARGET:
2885 err = ptlrpc_obd_ping(obd);
2888 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2889 cmd, cfs_curproc_comm());
2890 GOTO(out, err = -ENOTTY);
2893 cfs_module_put(THIS_MODULE);
2897 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2898 obd_count keylen, void *key, __u32 *vallen, void *val,
2899 struct lov_stripe_md *lsm)
2902 if (!vallen || !val)
2905 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2906 __u32 *stripe = val;
2907 *vallen = sizeof(*stripe);
2910 } else if (KEY_IS(KEY_LAST_ID)) {
2911 struct ptlrpc_request *req;
2916 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2917 &RQF_OST_GET_INFO_LAST_ID);
2921 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2922 RCL_CLIENT, keylen);
2923 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2925 ptlrpc_request_free(req);
2929 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2930 memcpy(tmp, key, keylen);
2932 req->rq_no_delay = req->rq_no_resend = 1;
2933 ptlrpc_request_set_replen(req);
2934 rc = ptlrpc_queue_wait(req);
2938 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2940 GOTO(out, rc = -EPROTO);
2942 *((obd_id *)val) = *reply;
2944 ptlrpc_req_finished(req);
2946 } else if (KEY_IS(KEY_FIEMAP)) {
2947 struct ptlrpc_request *req;
2948 struct ll_user_fiemap *reply;
2952 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2953 &RQF_OST_GET_INFO_FIEMAP);
2957 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2958 RCL_CLIENT, keylen);
2959 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2960 RCL_CLIENT, *vallen);
2961 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2962 RCL_SERVER, *vallen);
2964 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2966 ptlrpc_request_free(req);
2970 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2971 memcpy(tmp, key, keylen);
2972 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2973 memcpy(tmp, val, *vallen);
2975 ptlrpc_request_set_replen(req);
2976 rc = ptlrpc_queue_wait(req);
2980 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2982 GOTO(out1, rc = -EPROTO);
2984 memcpy(val, reply, *vallen);
2986 ptlrpc_req_finished(req);
2994 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
2996 struct llog_ctxt *ctxt;
3000 ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3002 rc = llog_initiator_connect(ctxt);
3003 llog_ctxt_put(ctxt);
3005 /* XXX return an error? skip setting below flags? */
3008 cfs_spin_lock(&imp->imp_lock);
3009 imp->imp_server_timeout = 1;
3010 imp->imp_pingable = 1;
3011 cfs_spin_unlock(&imp->imp_lock);
3012 CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3017 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3018 struct ptlrpc_request *req,
3025 RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3028 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3029 obd_count keylen, void *key, obd_count vallen,
3030 void *val, struct ptlrpc_request_set *set)
3032 struct ptlrpc_request *req;
3033 struct obd_device *obd = exp->exp_obd;
3034 struct obd_import *imp = class_exp2cliimp(exp);
3039 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3041 if (KEY_IS(KEY_NEXT_ID)) {
3043 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3045 if (vallen != sizeof(obd_id))
3050 if (vallen != sizeof(obd_id))
3053 /* avoid race between allocate new object and set next id
3054 * from ll_sync thread */
3055 cfs_spin_lock(&oscc->oscc_lock);
3056 new_val = *((obd_id*)val) + 1;
3057 if (new_val > oscc->oscc_next_id)
3058 oscc->oscc_next_id = new_val;
3059 cfs_spin_unlock(&oscc->oscc_lock);
3060 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3061 exp->exp_obd->obd_name,
3062 obd->u.cli.cl_oscc.oscc_next_id);
3067 if (KEY_IS(KEY_CHECKSUM)) {
3068 if (vallen != sizeof(int))
3070 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3074 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3075 sptlrpc_conf_client_adapt(obd);
3079 if (KEY_IS(KEY_FLUSH_CTX)) {
3080 sptlrpc_import_flush_my_ctx(imp);
3084 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3087 /* We pass all other commands directly to OST. Since nobody calls osc
3088 methods directly and everybody is supposed to go through LOV, we
3089 assume lov checked invalid values for us.
3090 The only recognised values so far are evict_by_nid and mds_conn.
3091 Even if something bad goes through, we'd get a -EINVAL from OST
3094 if (KEY_IS(KEY_GRANT_SHRINK))
3095 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3097 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3102 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3103 RCL_CLIENT, keylen);
3104 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3105 RCL_CLIENT, vallen);
3106 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3108 ptlrpc_request_free(req);
3112 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3113 memcpy(tmp, key, keylen);
3114 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3115 memcpy(tmp, val, vallen);
3117 if (KEY_IS(KEY_MDS_CONN)) {
3118 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3120 oscc->oscc_oa.o_seq = (*(__u32 *)val);
3121 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3122 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
3123 req->rq_no_delay = req->rq_no_resend = 1;
3124 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3125 } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3126 struct osc_grant_args *aa;
3129 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3130 aa = ptlrpc_req_async_args(req);
3133 ptlrpc_req_finished(req);
3136 *oa = ((struct ost_body *)val)->oa;
3138 req->rq_interpret_reply = osc_shrink_grant_interpret;
3141 ptlrpc_request_set_replen(req);
3142 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3143 LASSERT(set != NULL);
3144 ptlrpc_set_add_req(set, req);
3145 ptlrpc_check_set(NULL, set);
3147 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3153 static struct llog_operations osc_size_repl_logops = {
3154 lop_cancel: llog_obd_repl_cancel
3157 static struct llog_operations osc_mds_ost_orig_logops;
3159 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3160 struct obd_device *tgt, struct llog_catid *catid)
3165 rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
3166 &catid->lci_logid, &osc_mds_ost_orig_logops);
3168 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3172 rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
3173 NULL, &osc_size_repl_logops);
3175 struct llog_ctxt *ctxt =
3176 llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3179 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3184 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
3185 obd->obd_name, tgt->obd_name, catid, rc);
3186 CERROR("logid "LPX64":0x%x\n",
3187 catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3192 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3193 struct obd_device *disk_obd, int *index)
3195 struct llog_catid catid;
3196 static char name[32] = CATLIST;
3200 LASSERT(olg == &obd->obd_olg);
3202 cfs_mutex_lock(&olg->olg_cat_processing);
3203 rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
3205 CERROR("rc: %d\n", rc);
3209 CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
3210 obd->obd_name, *index, catid.lci_logid.lgl_oid,
3211 catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
3213 rc = __osc_llog_init(obd, olg, disk_obd, &catid);
3215 CERROR("rc: %d\n", rc);
3219 rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
3221 CERROR("rc: %d\n", rc);
3226 cfs_mutex_unlock(&olg->olg_cat_processing);
3231 static int osc_llog_finish(struct obd_device *obd, int count)
3233 struct llog_ctxt *ctxt;
3234 int rc = 0, rc2 = 0;
3237 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3239 rc = llog_cleanup(ctxt);
3241 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3243 rc2 = llog_cleanup(ctxt);
3250 static int osc_reconnect(const struct lu_env *env,
3251 struct obd_export *exp, struct obd_device *obd,
3252 struct obd_uuid *cluuid,
3253 struct obd_connect_data *data,
3256 struct client_obd *cli = &obd->u.cli;
3258 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3261 client_obd_list_lock(&cli->cl_loi_list_lock);
3262 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3263 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3264 lost_grant = cli->cl_lost_grant;
3265 cli->cl_lost_grant = 0;
3266 client_obd_list_unlock(&cli->cl_loi_list_lock);
3268 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3269 "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
3270 cli->cl_avail_grant, cli->cl_dirty, lost_grant);
3271 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3272 " ocd_grant: %d\n", data->ocd_connect_flags,
3273 data->ocd_version, data->ocd_grant);
3279 static int osc_disconnect(struct obd_export *exp)
3281 struct obd_device *obd = class_exp2obd(exp);
3282 struct llog_ctxt *ctxt;
3285 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3287 if (obd->u.cli.cl_conn_count == 1) {
3288 /* Flush any remaining cancel messages out to the
3290 llog_sync(ctxt, exp);
3292 llog_ctxt_put(ctxt);
3294 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3298 rc = client_disconnect_export(exp);
3300 * Initially we put del_shrink_grant before disconnect_export, but it
3301 * causes the following problem if setup (connect) and cleanup
3302 * (disconnect) are tangled together.
3303 * connect p1 disconnect p2
3304 * ptlrpc_connect_import
3305 * ............... class_manual_cleanup
3308 * ptlrpc_connect_interrupt
3310 * add this client to shrink list
3312 * Bang! pinger trigger the shrink.
3313 * So the osc should be disconnected from the shrink list, after we
3314 * are sure the import has been destroyed. BUG18662
3316 if (obd->u.cli.cl_import == NULL)
3317 osc_del_shrink_grant(&obd->u.cli);
3321 static int osc_import_event(struct obd_device *obd,
3322 struct obd_import *imp,
3323 enum obd_import_event event)
3325 struct client_obd *cli;
3329 LASSERT(imp->imp_obd == obd);
3332 case IMP_EVENT_DISCON: {
3333 /* Only do this on the MDS OSC's */
3334 if (imp->imp_server_timeout) {
3335 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3337 cfs_spin_lock(&oscc->oscc_lock);
3338 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3339 cfs_spin_unlock(&oscc->oscc_lock);
3342 client_obd_list_lock(&cli->cl_loi_list_lock);
3343 cli->cl_avail_grant = 0;
3344 cli->cl_lost_grant = 0;
3345 client_obd_list_unlock(&cli->cl_loi_list_lock);
3348 case IMP_EVENT_INACTIVE: {
3349 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3352 case IMP_EVENT_INVALIDATE: {
3353 struct ldlm_namespace *ns = obd->obd_namespace;
3357 env = cl_env_get(&refcheck);
3361 client_obd_list_lock(&cli->cl_loi_list_lock);
3362 /* all pages go to failing rpcs due to the invalid
3364 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3365 client_obd_list_unlock(&cli->cl_loi_list_lock);
3367 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3368 cl_env_put(env, &refcheck);
3373 case IMP_EVENT_ACTIVE: {
3374 /* Only do this on the MDS OSC's */
3375 if (imp->imp_server_timeout) {
3376 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3378 cfs_spin_lock(&oscc->oscc_lock);
3379 oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
3380 OSCC_FLAG_NOSPC_BLK);
3381 cfs_spin_unlock(&oscc->oscc_lock);
3383 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3386 case IMP_EVENT_OCD: {
3387 struct obd_connect_data *ocd = &imp->imp_connect_data;
3389 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3390 osc_init_grant(&obd->u.cli, ocd);
3393 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3394 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3396 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3399 case IMP_EVENT_DEACTIVATE: {
3400 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3403 case IMP_EVENT_ACTIVATE: {
3404 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3408 CERROR("Unknown import event %d\n", event);
3415 * Determine whether the lock can be canceled before replaying the lock
3416 * during recovery, see bug16774 for detailed information.
3418 * \retval zero the lock can't be canceled
3419 * \retval other ok to cancel
3421 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3423 check_res_locked(lock->l_resource);
3426 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3428 * XXX as a future improvement, we can also cancel unused write lock
3429 * if it doesn't have dirty data and active mmaps.
3431 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3432 (lock->l_granted_mode == LCK_PR ||
3433 lock->l_granted_mode == LCK_CR) &&
3434 (osc_dlm_lock_pageref(lock) == 0))
3440 static int brw_queue_work(const struct lu_env *env, void *data)
3442 struct client_obd *cli = data;
3444 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3446 client_obd_list_lock(&cli->cl_loi_list_lock);
3447 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3448 client_obd_list_unlock(&cli->cl_loi_list_lock);
3452 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3454 struct client_obd *cli = &obd->u.cli;
3459 rc = ptlrpcd_addref();
3463 rc = client_obd_setup(obd, lcfg);
3466 handler = ptlrpcd_alloc_work(cli->cl_import,
3467 brw_queue_work, cli);
3468 if (!IS_ERR(handler))
3469 cli->cl_writeback_work = handler;
3471 rc = PTR_ERR(handler);
3475 struct lprocfs_static_vars lvars = { 0 };
3477 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3478 lprocfs_osc_init_vars(&lvars);
3479 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3480 lproc_osc_attach_seqstat(obd);
3481 sptlrpc_lprocfs_cliobd_attach(obd);
3482 ptlrpc_lprocfs_register_obd(obd);
3486 /* We need to allocate a few requests more, because
3487 brw_interpret tries to create new requests before freeing
3488 previous ones. Ideally we want to have 2x max_rpcs_in_flight
3489 reserved, but I afraid that might be too much wasted RAM
3490 in fact, so 2 is just my guess and still should work. */
3491 cli->cl_import->imp_rq_pool =
3492 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3494 ptlrpc_add_rqs_to_pool);
3496 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3498 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3506 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3512 case OBD_CLEANUP_EARLY: {
3513 struct obd_import *imp;
3514 imp = obd->u.cli.cl_import;
3515 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3516 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3517 ptlrpc_deactivate_import(imp);
3518 cfs_spin_lock(&imp->imp_lock);
3519 imp->imp_pingable = 0;
3520 cfs_spin_unlock(&imp->imp_lock);
3523 case OBD_CLEANUP_EXPORTS: {
3524 struct client_obd *cli = &obd->u.cli;
3526 * for echo client, export may be on zombie list, wait for
3527 * zombie thread to cull it, because cli.cl_import will be
3528 * cleared in client_disconnect_export():
3529 * class_export_destroy() -> obd_cleanup() ->
3530 * echo_device_free() -> echo_client_cleanup() ->
3531 * obd_disconnect() -> osc_disconnect() ->
3532 * client_disconnect_export()
3534 obd_zombie_barrier();
3535 if (cli->cl_writeback_work) {
3536 ptlrpcd_destroy_work(cli->cl_writeback_work);
3537 cli->cl_writeback_work = NULL;
3539 obd_cleanup_client_import(obd);
3540 ptlrpc_lprocfs_unregister_obd(obd);
3541 lprocfs_obd_cleanup(obd);
3542 rc = obd_llog_finish(obd, 0);
3544 CERROR("failed to cleanup llogging subsystems\n");
3551 int osc_cleanup(struct obd_device *obd)
3557 /* free memory of osc quota cache */
3558 osc_quota_cleanup(obd);
3560 rc = client_obd_cleanup(obd);
3566 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3568 struct lprocfs_static_vars lvars = { 0 };
3571 lprocfs_osc_init_vars(&lvars);
3573 switch (lcfg->lcfg_command) {
3575 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3585 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3587 return osc_process_config_base(obd, buf);
3590 struct obd_ops osc_obd_ops = {
3591 .o_owner = THIS_MODULE,
3592 .o_setup = osc_setup,
3593 .o_precleanup = osc_precleanup,
3594 .o_cleanup = osc_cleanup,
3595 .o_add_conn = client_import_add_conn,
3596 .o_del_conn = client_import_del_conn,
3597 .o_connect = client_connect_import,
3598 .o_reconnect = osc_reconnect,
3599 .o_disconnect = osc_disconnect,
3600 .o_statfs = osc_statfs,
3601 .o_statfs_async = osc_statfs_async,
3602 .o_packmd = osc_packmd,
3603 .o_unpackmd = osc_unpackmd,
3604 .o_precreate = osc_precreate,
3605 .o_create = osc_create,
3606 .o_create_async = osc_create_async,
3607 .o_destroy = osc_destroy,
3608 .o_getattr = osc_getattr,
3609 .o_getattr_async = osc_getattr_async,
3610 .o_setattr = osc_setattr,
3611 .o_setattr_async = osc_setattr_async,
3613 .o_punch = osc_punch,
3615 .o_enqueue = osc_enqueue,
3616 .o_change_cbdata = osc_change_cbdata,
3617 .o_find_cbdata = osc_find_cbdata,
3618 .o_cancel = osc_cancel,
3619 .o_cancel_unused = osc_cancel_unused,
3620 .o_iocontrol = osc_iocontrol,
3621 .o_get_info = osc_get_info,
3622 .o_set_info_async = osc_set_info_async,
3623 .o_import_event = osc_import_event,
3624 .o_llog_init = osc_llog_init,
3625 .o_llog_finish = osc_llog_finish,
3626 .o_process_config = osc_process_config,
3627 .o_quotactl = osc_quotactl,
3628 .o_quotacheck = osc_quotacheck,
3629 .o_quota_adjust_qunit = osc_quota_adjust_qunit,
3632 extern struct lu_kmem_descr osc_caches[];
3633 extern cfs_spinlock_t osc_ast_guard;
3634 extern cfs_lock_class_key_t osc_ast_guard_class;
3636 int __init osc_init(void)
3638 struct lprocfs_static_vars lvars = { 0 };
3642 /* print an address of _any_ initialized kernel symbol from this
3643 * module, to allow debugging with gdb that doesn't support data
3644 * symbols from modules.*/
3645 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3647 rc = lu_kmem_init(osc_caches);
3649 lprocfs_osc_init_vars(&lvars);
3652 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3653 LUSTRE_OSC_NAME, &osc_device_type);
3655 lu_kmem_fini(osc_caches);
3659 cfs_spin_lock_init(&osc_ast_guard);
3660 cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3662 osc_mds_ost_orig_logops = llog_lvfs_ops;
3663 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3664 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3665 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3666 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3672 static void /*__exit*/ osc_exit(void)
3675 class_unregister_type(LUSTRE_OSC_NAME);
3676 lu_kmem_fini(osc_caches);
3679 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3680 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3681 MODULE_LICENSE("GPL");
3683 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);