4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
55 struct osc_brw_async_args {
59 obd_count aa_page_count;
61 struct brw_page **aa_ppga;
62 struct client_obd *aa_cli;
63 struct list_head aa_oaps;
64 struct list_head aa_exts;
65 struct obd_capa *aa_ocapa;
66 struct cl_req *aa_clerq;
69 #define osc_grant_args osc_brw_async_args
71 struct osc_setattr_args {
73 obd_enqueue_update_f sa_upcall;
77 struct osc_fsync_args {
78 struct obd_info *fa_oi;
79 obd_enqueue_update_f fa_upcall;
83 struct osc_enqueue_args {
84 struct obd_export *oa_exp;
88 osc_enqueue_upcall_f oa_upcall;
90 struct ost_lvb *oa_lvb;
91 struct lustre_handle oa_lockh;
92 unsigned int oa_agl:1;
95 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100 struct ost_body *body, void *capa)
102 struct obd_capa *oc = (struct obd_capa *)capa;
103 struct lustre_capa *c;
108 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
111 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112 DEBUG_CAPA(D_SEC, c, "pack");
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
117 struct ost_body *body;
119 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
122 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
124 osc_pack_capa(req, body, oinfo->oi_capa);
127 void osc_set_capa_size(struct ptlrpc_request *req,
128 const struct req_msg_field *field,
132 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
134 /* it is already calculated as sizeof struct obd_capa */
138 int osc_getattr_interpret(const struct lu_env *env,
139 struct ptlrpc_request *req,
140 struct osc_async_args *aa, int rc)
142 struct ost_body *body;
148 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152 aa->aa_oi->oi_oa, &body->oa);
154 /* This should really be sent by the OST */
155 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
158 CDEBUG(D_INFO, "can't unpack ost_body\n");
160 aa->aa_oi->oi_oa->o_valid = 0;
163 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168 struct obd_info *oinfo)
170 struct ptlrpc_request *req;
171 struct ost_body *body;
175 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
179 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
182 ptlrpc_request_free(req);
186 osc_pack_req_body(req, oinfo);
188 ptlrpc_request_set_replen(req);
190 rc = ptlrpc_queue_wait(req);
194 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196 GOTO(out, rc = -EPROTO);
198 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
202 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
207 ptlrpc_req_finished(req);
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212 struct obd_info *oinfo, struct obd_trans_info *oti)
214 struct ptlrpc_request *req;
215 struct ost_body *body;
219 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
221 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
225 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
228 ptlrpc_request_free(req);
232 osc_pack_req_body(req, oinfo);
234 ptlrpc_request_set_replen(req);
236 rc = ptlrpc_queue_wait(req);
240 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
242 GOTO(out, rc = -EPROTO);
244 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
249 ptlrpc_req_finished(req);
253 static int osc_setattr_interpret(const struct lu_env *env,
254 struct ptlrpc_request *req,
255 struct osc_setattr_args *sa, int rc)
257 struct ost_body *body;
263 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
265 GOTO(out, rc = -EPROTO);
267 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
270 rc = sa->sa_upcall(sa->sa_cookie, rc);
274 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
275 struct obd_trans_info *oti,
276 obd_enqueue_update_f upcall, void *cookie,
277 struct ptlrpc_request_set *rqset)
279 struct ptlrpc_request *req;
280 struct osc_setattr_args *sa;
284 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
288 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
289 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
291 ptlrpc_request_free(req);
295 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
296 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
298 osc_pack_req_body(req, oinfo);
300 ptlrpc_request_set_replen(req);
302 /* do mds to ost setattr asynchronously */
304 /* Do not wait for response. */
305 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
307 req->rq_interpret_reply =
308 (ptlrpc_interpterer_t)osc_setattr_interpret;
310 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
311 sa = ptlrpc_req_async_args(req);
312 sa->sa_oa = oinfo->oi_oa;
313 sa->sa_upcall = upcall;
314 sa->sa_cookie = cookie;
316 if (rqset == PTLRPCD_SET)
317 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
319 ptlrpc_set_add_req(rqset, req);
325 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
326 struct obd_trans_info *oti,
327 struct ptlrpc_request_set *rqset)
329 return osc_setattr_async_base(exp, oinfo, oti,
330 oinfo->oi_cb_up, oinfo, rqset);
333 static int osc_create(const struct lu_env *env, struct obd_export *exp,
334 struct obdo *oa, struct obd_trans_info *oti)
336 struct ptlrpc_request *req;
337 struct ost_body *body;
342 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
343 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
345 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
347 GOTO(out, rc = -ENOMEM);
349 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
351 ptlrpc_request_free(req);
355 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
358 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
360 ptlrpc_request_set_replen(req);
362 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
363 oa->o_flags == OBD_FL_DELORPHAN) {
365 "delorphan from OST integration");
366 /* Don't resend the delorphan req */
367 req->rq_no_resend = req->rq_no_delay = 1;
370 rc = ptlrpc_queue_wait(req);
374 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
376 GOTO(out_req, rc = -EPROTO);
378 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
379 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
381 oa->o_blksize = cli_brw_size(exp->exp_obd);
382 oa->o_valid |= OBD_MD_FLBLKSZ;
385 if (oa->o_valid & OBD_MD_FLCOOKIE) {
386 if (oti->oti_logcookies == NULL)
387 oti->oti_logcookies = &oti->oti_onecookie;
389 *oti->oti_logcookies = oa->o_lcookie;
393 CDEBUG(D_HA, "transno: "LPD64"\n",
394 lustre_msg_get_transno(req->rq_repmsg));
396 ptlrpc_req_finished(req);
401 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
402 obd_enqueue_update_f upcall, void *cookie,
403 struct ptlrpc_request_set *rqset)
405 struct ptlrpc_request *req;
406 struct osc_setattr_args *sa;
407 struct ost_body *body;
411 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
415 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
416 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
418 ptlrpc_request_free(req);
421 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
422 ptlrpc_at_set_req_timeout(req);
424 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
426 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
428 osc_pack_capa(req, body, oinfo->oi_capa);
430 ptlrpc_request_set_replen(req);
432 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
433 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
434 sa = ptlrpc_req_async_args(req);
435 sa->sa_oa = oinfo->oi_oa;
436 sa->sa_upcall = upcall;
437 sa->sa_cookie = cookie;
438 if (rqset == PTLRPCD_SET)
439 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
441 ptlrpc_set_add_req(rqset, req);
446 static int osc_sync_interpret(const struct lu_env *env,
447 struct ptlrpc_request *req,
450 struct osc_fsync_args *fa = arg;
451 struct ost_body *body;
457 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
459 CERROR ("can't unpack ost_body\n");
460 GOTO(out, rc = -EPROTO);
463 *fa->fa_oi->oi_oa = body->oa;
465 rc = fa->fa_upcall(fa->fa_cookie, rc);
469 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
470 obd_enqueue_update_f upcall, void *cookie,
471 struct ptlrpc_request_set *rqset)
473 struct ptlrpc_request *req;
474 struct ost_body *body;
475 struct osc_fsync_args *fa;
479 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
483 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
484 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
486 ptlrpc_request_free(req);
490 /* overload the size and blocks fields in the oa with start/end */
491 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
493 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
495 osc_pack_capa(req, body, oinfo->oi_capa);
497 ptlrpc_request_set_replen(req);
498 req->rq_interpret_reply = osc_sync_interpret;
500 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
501 fa = ptlrpc_req_async_args(req);
503 fa->fa_upcall = upcall;
504 fa->fa_cookie = cookie;
506 if (rqset == PTLRPCD_SET)
507 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
509 ptlrpc_set_add_req(rqset, req);
514 /* Find and cancel locally locks matched by @mode in the resource found by
515 * @objid. Found locks are added into @cancel list. Returns the amount of
516 * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518 struct list_head *cancels,
519 ldlm_mode_t mode, __u64 lock_flags)
521 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522 struct ldlm_res_id res_id;
523 struct ldlm_resource *res;
527 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528 * export) but disabled through procfs (flag in NS).
530 * This distinguishes from a case when ELC is not supported originally,
531 * when we still want to cancel locks in advance and just cancel them
532 * locally, without sending any RPC. */
533 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
536 ostid_build_res_name(&oa->o_oi, &res_id);
537 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
541 LDLM_RESOURCE_ADDREF(res);
542 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543 lock_flags, 0, NULL);
544 LDLM_RESOURCE_DELREF(res);
545 ldlm_resource_putref(res);
549 static int osc_destroy_interpret(const struct lu_env *env,
550 struct ptlrpc_request *req, void *data,
553 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
555 atomic_dec(&cli->cl_destroy_in_flight);
556 wake_up(&cli->cl_destroy_waitq);
560 static int osc_can_send_destroy(struct client_obd *cli)
562 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563 cli->cl_max_rpcs_in_flight) {
564 /* The destroy request can be sent */
567 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568 cli->cl_max_rpcs_in_flight) {
570 * The counter has been modified between the two atomic
573 wake_up(&cli->cl_destroy_waitq);
578 /* Destroy requests can be async always on the client, and we don't even really
579 * care about the return code since the client cannot do anything at all about
581 * When the MDS is unlinking a filename, it saves the file objects into a
582 * recovery llog, and these object records are cancelled when the OST reports
583 * they were destroyed and sync'd to disk (i.e. transaction committed).
584 * If the client dies, or the OST is down when the object should be destroyed,
585 * the records are not cancelled, and when the OST reconnects to the MDS next,
586 * it will retrieve the llog unlink logs and then sends the log cancellation
587 * cookies to the MDS after committing destroy transactions. */
588 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
589 struct obdo *oa, struct obd_trans_info *oti)
591 struct client_obd *cli = &exp->exp_obd->u.cli;
592 struct ptlrpc_request *req;
593 struct ost_body *body;
594 struct list_head cancels = LIST_HEAD_INIT(cancels);
599 CDEBUG(D_INFO, "oa NULL\n");
603 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
604 LDLM_FL_DISCARD_DATA);
606 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
608 ldlm_lock_list_put(&cancels, l_bl_ast, count);
612 osc_set_capa_size(req, &RMF_CAPA1, NULL);
613 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
616 ptlrpc_request_free(req);
620 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
621 ptlrpc_at_set_req_timeout(req);
623 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
624 oa->o_lcookie = *oti->oti_logcookies;
625 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
627 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
629 ptlrpc_request_set_replen(req);
631 /* If osc_destory is for destroying the unlink orphan,
632 * sent from MDT to OST, which should not be blocked here,
633 * because the process might be triggered by ptlrpcd, and
634 * it is not good to block ptlrpcd thread (b=16006)*/
635 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
636 req->rq_interpret_reply = osc_destroy_interpret;
637 if (!osc_can_send_destroy(cli)) {
638 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
642 * Wait until the number of on-going destroy RPCs drops
643 * under max_rpc_in_flight
645 l_wait_event_exclusive(cli->cl_destroy_waitq,
646 osc_can_send_destroy(cli), &lwi);
650 /* Do not wait for response */
651 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
655 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
658 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
660 LASSERT(!(oa->o_valid & bits));
663 spin_lock(&cli->cl_loi_list_lock);
664 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
665 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
666 cli->cl_dirty_max_pages)) {
667 CERROR("dirty %lu - %lu > dirty_max %lu\n",
668 cli->cl_dirty_pages, cli->cl_dirty_transit,
669 cli->cl_dirty_max_pages);
671 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
672 atomic_long_read(&obd_dirty_transit_pages) >
673 (obd_max_dirty_pages + 1))) {
674 /* The atomic_read() allowing the atomic_inc() are
675 * not covered by a lock thus they may safely race and trip
676 * this CERROR() unless we add in a small fudge factor (+1). */
677 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
678 cli->cl_import->imp_obd->obd_name,
679 atomic_long_read(&obd_dirty_pages),
680 atomic_long_read(&obd_dirty_transit_pages),
681 obd_max_dirty_pages);
683 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
685 CERROR("dirty %lu - dirty_max %lu too big???\n",
686 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
689 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
691 (cli->cl_max_rpcs_in_flight + 1);
692 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
695 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
696 oa->o_dropped = cli->cl_lost_grant;
697 cli->cl_lost_grant = 0;
698 spin_unlock(&cli->cl_loi_list_lock);
699 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
700 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
704 void osc_update_next_shrink(struct client_obd *cli)
706 cli->cl_next_shrink_grant =
707 cfs_time_shift(cli->cl_grant_shrink_interval);
708 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
709 cli->cl_next_shrink_grant);
712 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
714 spin_lock(&cli->cl_loi_list_lock);
715 cli->cl_avail_grant += grant;
716 spin_unlock(&cli->cl_loi_list_lock);
719 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
721 if (body->oa.o_valid & OBD_MD_FLGRANT) {
722 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
723 __osc_update_grant(cli, body->oa.o_grant);
727 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
728 obd_count keylen, void *key, obd_count vallen,
729 void *val, struct ptlrpc_request_set *set);
731 static int osc_shrink_grant_interpret(const struct lu_env *env,
732 struct ptlrpc_request *req,
735 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
736 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
737 struct ost_body *body;
740 __osc_update_grant(cli, oa->o_grant);
744 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
746 osc_update_grant(cli, body);
752 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
754 spin_lock(&cli->cl_loi_list_lock);
755 oa->o_grant = cli->cl_avail_grant / 4;
756 cli->cl_avail_grant -= oa->o_grant;
757 spin_unlock(&cli->cl_loi_list_lock);
758 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
759 oa->o_valid |= OBD_MD_FLFLAGS;
762 oa->o_flags |= OBD_FL_SHRINK_GRANT;
763 osc_update_next_shrink(cli);
766 /* Shrink the current grant, either from some large amount to enough for a
767 * full set of in-flight RPCs, or if we have already shrunk to that limit
768 * then to enough for a single RPC. This avoids keeping more grant than
769 * needed, and avoids shrinking the grant piecemeal. */
770 static int osc_shrink_grant(struct client_obd *cli)
772 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
773 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
775 spin_lock(&cli->cl_loi_list_lock);
776 if (cli->cl_avail_grant <= target_bytes)
777 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
778 spin_unlock(&cli->cl_loi_list_lock);
780 return osc_shrink_grant_to_target(cli, target_bytes);
783 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
786 struct ost_body *body;
789 spin_lock(&cli->cl_loi_list_lock);
790 /* Don't shrink if we are already above or below the desired limit
791 * We don't want to shrink below a single RPC, as that will negatively
792 * impact block allocation and long-term performance. */
793 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
794 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
796 if (target_bytes >= cli->cl_avail_grant) {
797 spin_unlock(&cli->cl_loi_list_lock);
800 spin_unlock(&cli->cl_loi_list_lock);
806 osc_announce_cached(cli, &body->oa, 0);
808 spin_lock(&cli->cl_loi_list_lock);
809 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
810 cli->cl_avail_grant = target_bytes;
811 spin_unlock(&cli->cl_loi_list_lock);
812 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
813 body->oa.o_valid |= OBD_MD_FLFLAGS;
814 body->oa.o_flags = 0;
816 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
817 osc_update_next_shrink(cli);
819 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
820 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
821 sizeof(*body), body, NULL);
823 __osc_update_grant(cli, body->oa.o_grant);
828 static int osc_should_shrink_grant(struct client_obd *client)
830 cfs_time_t time = cfs_time_current();
831 cfs_time_t next_shrink = client->cl_next_shrink_grant;
833 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
834 OBD_CONNECT_GRANT_SHRINK) == 0)
837 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
838 /* Get the current RPC size directly, instead of going via:
839 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
840 * Keep comment here so that it can be found by searching. */
841 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
843 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
844 client->cl_avail_grant > brw_size)
847 osc_update_next_shrink(client);
852 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
854 struct client_obd *client;
856 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
857 if (osc_should_shrink_grant(client))
858 osc_shrink_grant(client);
863 static int osc_add_shrink_grant(struct client_obd *client)
867 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
869 osc_grant_shrink_grant_cb, NULL,
870 &client->cl_grant_shrink_list);
872 CERROR("add grant client %s error %d\n",
873 client->cl_import->imp_obd->obd_name, rc);
876 CDEBUG(D_CACHE, "add grant client %s \n",
877 client->cl_import->imp_obd->obd_name);
878 osc_update_next_shrink(client);
882 static int osc_del_shrink_grant(struct client_obd *client)
884 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
888 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
891 * ocd_grant is the total grant amount we're expect to hold: if we've
892 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
893 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
896 * race is tolerable here: if we're evicted, but imp_state already
897 * left EVICTED state, then cl_dirty_pages must be 0 already.
899 spin_lock(&cli->cl_loi_list_lock);
900 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
901 cli->cl_avail_grant = ocd->ocd_grant;
903 cli->cl_avail_grant = ocd->ocd_grant -
904 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
906 if (cli->cl_avail_grant < 0) {
907 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
908 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
909 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
910 /* workaround for servers which do not have the patch from
912 cli->cl_avail_grant = ocd->ocd_grant;
915 /* determine the appropriate chunk size used by osc_extent. */
916 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
917 spin_unlock(&cli->cl_loi_list_lock);
919 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
920 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
921 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
923 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
924 list_empty(&cli->cl_grant_shrink_list))
925 osc_add_shrink_grant(cli);
928 /* We assume that the reason this OSC got a short read is because it read
929 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
930 * via the LOV, and it _knows_ it's reading inside the file, it's just that
931 * this stripe never got written at or beyond this stripe offset yet. */
932 static void handle_short_read(int nob_read, obd_count page_count,
933 struct brw_page **pga)
938 /* skip bytes read OK */
939 while (nob_read > 0) {
940 LASSERT (page_count > 0);
942 if (pga[i]->count > nob_read) {
943 /* EOF inside this page */
944 ptr = kmap(pga[i]->pg) +
945 (pga[i]->off & ~CFS_PAGE_MASK);
946 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
953 nob_read -= pga[i]->count;
958 /* zero remaining pages */
959 while (page_count-- > 0) {
960 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
961 memset(ptr, 0, pga[i]->count);
967 static int check_write_rcs(struct ptlrpc_request *req,
968 int requested_nob, int niocount,
969 obd_count page_count, struct brw_page **pga)
974 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
975 sizeof(*remote_rcs) *
977 if (remote_rcs == NULL) {
978 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
982 /* return error if any niobuf was in error */
983 for (i = 0; i < niocount; i++) {
984 if ((int)remote_rcs[i] < 0)
985 return(remote_rcs[i]);
987 if (remote_rcs[i] != 0) {
988 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
989 i, remote_rcs[i], req);
994 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
995 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
996 req->rq_bulk->bd_nob_transferred, requested_nob);
1003 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1005 if (p1->flag != p2->flag) {
1006 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1007 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1008 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1010 /* warn if we try to combine flags that we don't know to be
1011 * safe to combine */
1012 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1013 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1014 "report this at https://jira.hpdd.intel.com/\n",
1015 p1->flag, p2->flag);
1020 return (p1->off + p1->count == p2->off);
1023 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1024 struct brw_page **pga, int opc,
1025 cksum_type_t cksum_type)
1029 struct cfs_crypto_hash_desc *hdesc;
1030 unsigned int bufsize;
1032 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1034 LASSERT(pg_count > 0);
1036 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1037 if (IS_ERR(hdesc)) {
1038 CERROR("Unable to initialize checksum hash %s\n",
1039 cfs_crypto_hash_name(cfs_alg));
1040 return PTR_ERR(hdesc);
1043 while (nob > 0 && pg_count > 0) {
1044 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1046 /* corrupt the data before we compute the checksum, to
1047 * simulate an OST->client data error */
1048 if (i == 0 && opc == OST_READ &&
1049 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1050 unsigned char *ptr = kmap(pga[i]->pg);
1051 int off = pga[i]->off & ~CFS_PAGE_MASK;
1053 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1056 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1057 pga[i]->off & ~CFS_PAGE_MASK,
1059 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1060 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1062 nob -= pga[i]->count;
1067 bufsize = sizeof(cksum);
1068 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1070 /* For sending we only compute the wrong checksum instead
1071 * of corrupting the data so it is still correct on a redo */
1072 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1078 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1079 struct lov_stripe_md *lsm, obd_count page_count,
1080 struct brw_page **pga,
1081 struct ptlrpc_request **reqp,
1082 struct obd_capa *ocapa, int reserve,
1085 struct ptlrpc_request *req;
1086 struct ptlrpc_bulk_desc *desc;
1087 struct ost_body *body;
1088 struct obd_ioobj *ioobj;
1089 struct niobuf_remote *niobuf;
1090 int niocount, i, requested_nob, opc, rc;
1091 struct osc_brw_async_args *aa;
1092 struct req_capsule *pill;
1093 struct brw_page *pg_prev;
1096 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1097 RETURN(-ENOMEM); /* Recoverable */
1098 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1099 RETURN(-EINVAL); /* Fatal */
1101 if ((cmd & OBD_BRW_WRITE) != 0) {
1103 req = ptlrpc_request_alloc_pool(cli->cl_import,
1104 cli->cl_import->imp_rq_pool,
1105 &RQF_OST_BRW_WRITE);
1108 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1113 for (niocount = i = 1; i < page_count; i++) {
1114 if (!can_merge_pages(pga[i - 1], pga[i]))
1118 pill = &req->rq_pill;
1119 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1121 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1122 niocount * sizeof(*niobuf));
1123 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1125 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1127 ptlrpc_request_free(req);
1130 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1131 ptlrpc_at_set_req_timeout(req);
1132 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1134 req->rq_no_retry_einprogress = 1;
1136 desc = ptlrpc_prep_bulk_imp(req, page_count,
1137 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1138 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1142 GOTO(out, rc = -ENOMEM);
1143 /* NB request now owns desc and will free it when it gets freed */
1145 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1146 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1147 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1148 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1150 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1152 obdo_to_ioobj(oa, ioobj);
1153 ioobj->ioo_bufcnt = niocount;
1154 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1155 * that might be send for this request. The actual number is decided
1156 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1157 * "max - 1" for old client compatibility sending "0", and also so the
1158 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1159 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1160 osc_pack_capa(req, body, ocapa);
1161 LASSERT(page_count > 0);
1163 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1164 struct brw_page *pg = pga[i];
1165 int poff = pg->off & ~CFS_PAGE_MASK;
1167 LASSERT(pg->count > 0);
1168 /* make sure there is no gap in the middle of page array */
1169 LASSERTF(page_count == 1 ||
1170 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1171 ergo(i > 0 && i < page_count - 1,
1172 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1173 ergo(i == page_count - 1, poff == 0)),
1174 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1175 i, page_count, pg, pg->off, pg->count);
1176 LASSERTF(i == 0 || pg->off > pg_prev->off,
1177 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1178 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1180 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1181 pg_prev->pg, page_private(pg_prev->pg),
1182 pg_prev->pg->index, pg_prev->off);
1183 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1184 (pg->flag & OBD_BRW_SRVLOCK));
1186 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1187 requested_nob += pg->count;
1189 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1191 niobuf->rnb_len += pg->count;
1193 niobuf->rnb_offset = pg->off;
1194 niobuf->rnb_len = pg->count;
1195 niobuf->rnb_flags = pg->flag;
1200 LASSERTF((void *)(niobuf - niocount) ==
1201 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1202 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1203 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1205 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1207 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1208 body->oa.o_valid |= OBD_MD_FLFLAGS;
1209 body->oa.o_flags = 0;
1211 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1214 if (osc_should_shrink_grant(cli))
1215 osc_shrink_grant_local(cli, &body->oa);
1217 /* size[REQ_REC_OFF] still sizeof (*body) */
1218 if (opc == OST_WRITE) {
1219 if (cli->cl_checksum &&
1220 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1221 /* store cl_cksum_type in a local variable since
1222 * it can be changed via lprocfs */
1223 cksum_type_t cksum_type = cli->cl_cksum_type;
1225 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1226 oa->o_flags &= OBD_FL_LOCAL_MASK;
1227 body->oa.o_flags = 0;
1229 body->oa.o_flags |= cksum_type_pack(cksum_type);
1230 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1231 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1235 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1237 /* save this in 'oa', too, for later checking */
1238 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1239 oa->o_flags |= cksum_type_pack(cksum_type);
1241 /* clear out the checksum flag, in case this is a
1242 * resend but cl_checksum is no longer set. b=11238 */
1243 oa->o_valid &= ~OBD_MD_FLCKSUM;
1245 oa->o_cksum = body->oa.o_cksum;
1246 /* 1 RC per niobuf */
1247 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1248 sizeof(__u32) * niocount);
1250 if (cli->cl_checksum &&
1251 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1252 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1253 body->oa.o_flags = 0;
1254 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1255 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1258 ptlrpc_request_set_replen(req);
1260 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1261 aa = ptlrpc_req_async_args(req);
1263 aa->aa_requested_nob = requested_nob;
1264 aa->aa_nio_count = niocount;
1265 aa->aa_page_count = page_count;
1269 INIT_LIST_HEAD(&aa->aa_oaps);
1270 if (ocapa && reserve)
1271 aa->aa_ocapa = capa_get(ocapa);
1274 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1275 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1276 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1277 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1281 ptlrpc_req_finished(req);
1285 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1286 __u32 client_cksum, __u32 server_cksum, int nob,
1287 obd_count page_count, struct brw_page **pga,
1288 cksum_type_t client_cksum_type)
1292 cksum_type_t cksum_type;
1294 if (server_cksum == client_cksum) {
1295 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1299 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1301 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1304 if (cksum_type != client_cksum_type)
1305 msg = "the server did not use the checksum type specified in "
1306 "the original request - likely a protocol problem";
1307 else if (new_cksum == server_cksum)
1308 msg = "changed on the client after we checksummed it - "
1309 "likely false positive due to mmap IO (bug 11742)";
1310 else if (new_cksum == client_cksum)
1311 msg = "changed in transit before arrival at OST";
1313 msg = "changed in transit AND doesn't match the original - "
1314 "likely false positive due to mmap IO (bug 11742)";
1316 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1317 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1318 msg, libcfs_nid2str(peer->nid),
1319 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1320 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1321 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1322 POSTID(&oa->o_oi), pga[0]->off,
1323 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1324 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1325 "client csum now %x\n", client_cksum, client_cksum_type,
1326 server_cksum, cksum_type, new_cksum);
1330 /* Note rc enters this function as number of bytes transferred */
1331 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1333 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1334 const lnet_process_id_t *peer =
1335 &req->rq_import->imp_connection->c_peer;
1336 struct client_obd *cli = aa->aa_cli;
1337 struct ost_body *body;
1338 u32 client_cksum = 0;
1341 if (rc < 0 && rc != -EDQUOT) {
1342 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1346 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1347 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1349 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1353 /* set/clear over quota flag for a uid/gid */
1354 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1355 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1356 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1358 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1359 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1361 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1364 osc_update_grant(cli, body);
1369 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1370 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1372 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1374 CERROR("Unexpected +ve rc %d\n", rc);
1377 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1379 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1382 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1383 check_write_checksum(&body->oa, peer, client_cksum,
1384 body->oa.o_cksum, aa->aa_requested_nob,
1385 aa->aa_page_count, aa->aa_ppga,
1386 cksum_type_unpack(aa->aa_oa->o_flags)))
1389 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1390 aa->aa_page_count, aa->aa_ppga);
1394 /* The rest of this function executes only for OST_READs */
1396 /* if unwrap_bulk failed, return -EAGAIN to retry */
1397 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1399 GOTO(out, rc = -EAGAIN);
1401 if (rc > aa->aa_requested_nob) {
1402 CERROR("Unexpected rc %d (%d requested)\n", rc,
1403 aa->aa_requested_nob);
1407 if (rc != req->rq_bulk->bd_nob_transferred) {
1408 CERROR ("Unexpected rc %d (%d transferred)\n",
1409 rc, req->rq_bulk->bd_nob_transferred);
1413 if (rc < aa->aa_requested_nob)
1414 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1416 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1417 static int cksum_counter;
1418 u32 server_cksum = body->oa.o_cksum;
1421 cksum_type_t cksum_type;
1423 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1424 body->oa.o_flags : 0);
1425 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1426 aa->aa_ppga, OST_READ,
1429 if (peer->nid != req->rq_bulk->bd_sender) {
1431 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1434 if (server_cksum != client_cksum) {
1435 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1436 "%s%s%s inode "DFID" object "DOSTID
1437 " extent ["LPU64"-"LPU64"]\n",
1438 req->rq_import->imp_obd->obd_name,
1439 libcfs_nid2str(peer->nid),
1441 body->oa.o_valid & OBD_MD_FLFID ?
1442 body->oa.o_parent_seq : (__u64)0,
1443 body->oa.o_valid & OBD_MD_FLFID ?
1444 body->oa.o_parent_oid : 0,
1445 body->oa.o_valid & OBD_MD_FLFID ?
1446 body->oa.o_parent_ver : 0,
1447 POSTID(&body->oa.o_oi),
1448 aa->aa_ppga[0]->off,
1449 aa->aa_ppga[aa->aa_page_count-1]->off +
1450 aa->aa_ppga[aa->aa_page_count-1]->count -
1452 CERROR("client %x, server %x, cksum_type %x\n",
1453 client_cksum, server_cksum, cksum_type);
1455 aa->aa_oa->o_cksum = client_cksum;
1459 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1462 } else if (unlikely(client_cksum)) {
1463 static int cksum_missed;
1466 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1467 CERROR("Checksum %u requested from %s but not sent\n",
1468 cksum_missed, libcfs_nid2str(peer->nid));
1474 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1475 aa->aa_oa, &body->oa);
1480 static int osc_brw_redo_request(struct ptlrpc_request *request,
1481 struct osc_brw_async_args *aa, int rc)
1483 struct ptlrpc_request *new_req;
1484 struct osc_brw_async_args *new_aa;
1485 struct osc_async_page *oap;
1488 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1489 "redo for recoverable error %d", rc);
1491 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1492 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1493 aa->aa_cli, aa->aa_oa,
1494 NULL /* lsm unused by osc currently */,
1495 aa->aa_page_count, aa->aa_ppga,
1496 &new_req, aa->aa_ocapa, 0, 1);
1500 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1501 if (oap->oap_request != NULL) {
1502 LASSERTF(request == oap->oap_request,
1503 "request %p != oap_request %p\n",
1504 request, oap->oap_request);
1505 if (oap->oap_interrupted) {
1506 ptlrpc_req_finished(new_req);
1511 /* New request takes over pga and oaps from old request.
1512 * Note that copying a list_head doesn't work, need to move it... */
1514 new_req->rq_interpret_reply = request->rq_interpret_reply;
1515 new_req->rq_async_args = request->rq_async_args;
1516 new_req->rq_commit_cb = request->rq_commit_cb;
1517 /* cap resend delay to the current request timeout, this is similar to
1518 * what ptlrpc does (see after_reply()) */
1519 if (aa->aa_resends > new_req->rq_timeout)
1520 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1522 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1523 new_req->rq_generation_set = 1;
1524 new_req->rq_import_generation = request->rq_import_generation;
1526 new_aa = ptlrpc_req_async_args(new_req);
1528 INIT_LIST_HEAD(&new_aa->aa_oaps);
1529 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1530 INIT_LIST_HEAD(&new_aa->aa_exts);
1531 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1532 new_aa->aa_resends = aa->aa_resends;
1534 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1535 if (oap->oap_request) {
1536 ptlrpc_req_finished(oap->oap_request);
1537 oap->oap_request = ptlrpc_request_addref(new_req);
1541 new_aa->aa_ocapa = aa->aa_ocapa;
1542 aa->aa_ocapa = NULL;
1544 /* XXX: This code will run into problem if we're going to support
1545 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1546 * and wait for all of them to be finished. We should inherit request
1547 * set from old request. */
1548 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1550 DEBUG_REQ(D_INFO, new_req, "new request");
1555 * ugh, we want disk allocation on the target to happen in offset order. we'll
1556 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1557 * fine for our small page arrays and doesn't require allocation. its an
1558 * insertion sort that swaps elements that are strides apart, shrinking the
1559 * stride down until its '1' and the array is sorted.
1561 static void sort_brw_pages(struct brw_page **array, int num)
1564 struct brw_page *tmp;
1568 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1573 for (i = stride ; i < num ; i++) {
1576 while (j >= stride && array[j - stride]->off > tmp->off) {
1577 array[j] = array[j - stride];
1582 } while (stride > 1);
1585 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1587 LASSERT(ppga != NULL);
1588 OBD_FREE(ppga, sizeof(*ppga) * count);
1591 static int brw_interpret(const struct lu_env *env,
1592 struct ptlrpc_request *req, void *data, int rc)
1594 struct osc_brw_async_args *aa = data;
1595 struct osc_extent *ext;
1596 struct osc_extent *tmp;
1597 struct client_obd *cli = aa->aa_cli;
1600 rc = osc_brw_fini_request(req, rc);
1601 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1602 /* When server return -EINPROGRESS, client should always retry
1603 * regardless of the number of times the bulk was resent already. */
1604 if (osc_recoverable_error(rc)) {
1605 if (req->rq_import_generation !=
1606 req->rq_import->imp_generation) {
1607 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1608 ""DOSTID", rc = %d.\n",
1609 req->rq_import->imp_obd->obd_name,
1610 POSTID(&aa->aa_oa->o_oi), rc);
1611 } else if (rc == -EINPROGRESS ||
1612 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1613 rc = osc_brw_redo_request(req, aa, rc);
1615 CERROR("%s: too many resent retries for object: "
1616 ""LPU64":"LPU64", rc = %d.\n",
1617 req->rq_import->imp_obd->obd_name,
1618 POSTID(&aa->aa_oa->o_oi), rc);
1623 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1628 capa_put(aa->aa_ocapa);
1629 aa->aa_ocapa = NULL;
1633 struct obdo *oa = aa->aa_oa;
1634 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1635 unsigned long valid = 0;
1636 struct cl_object *obj;
1637 struct osc_async_page *last;
1639 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1640 obj = osc2cl(last->oap_obj);
1642 cl_object_attr_lock(obj);
1643 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1644 attr->cat_blocks = oa->o_blocks;
1645 valid |= CAT_BLOCKS;
1647 if (oa->o_valid & OBD_MD_FLMTIME) {
1648 attr->cat_mtime = oa->o_mtime;
1651 if (oa->o_valid & OBD_MD_FLATIME) {
1652 attr->cat_atime = oa->o_atime;
1655 if (oa->o_valid & OBD_MD_FLCTIME) {
1656 attr->cat_ctime = oa->o_ctime;
1660 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1661 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1662 loff_t last_off = last->oap_count + last->oap_obj_off +
1665 /* Change file size if this is an out of quota or
1666 * direct IO write and it extends the file size */
1667 if (loi->loi_lvb.lvb_size < last_off) {
1668 attr->cat_size = last_off;
1671 /* Extend KMS if it's not a lockless write */
1672 if (loi->loi_kms < last_off &&
1673 oap2osc_page(last)->ops_srvlock == 0) {
1674 attr->cat_kms = last_off;
1680 cl_object_attr_update(env, obj, attr, valid);
1681 cl_object_attr_unlock(obj);
1683 OBDO_FREE(aa->aa_oa);
1685 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1686 osc_inc_unstable_pages(req);
1688 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1689 list_del_init(&ext->oe_link);
1690 osc_extent_finish(env, ext, 1, rc);
1692 LASSERT(list_empty(&aa->aa_exts));
1693 LASSERT(list_empty(&aa->aa_oaps));
1695 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1696 req->rq_bulk->bd_nob_transferred);
1697 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1698 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1700 spin_lock(&cli->cl_loi_list_lock);
1701 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1702 * is called so we know whether to go to sync BRWs or wait for more
1703 * RPCs to complete */
1704 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1705 cli->cl_w_in_flight--;
1707 cli->cl_r_in_flight--;
1708 osc_wake_cache_waiters(cli);
1709 spin_unlock(&cli->cl_loi_list_lock);
1711 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1715 static void brw_commit(struct ptlrpc_request *req)
1717 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1718 * this called via the rq_commit_cb, I need to ensure
1719 * osc_dec_unstable_pages is still called. Otherwise unstable
1720 * pages may be leaked. */
1721 spin_lock(&req->rq_lock);
1722 if (likely(req->rq_unstable)) {
1723 req->rq_unstable = 0;
1724 spin_unlock(&req->rq_lock);
1726 osc_dec_unstable_pages(req);
1728 req->rq_committed = 1;
1729 spin_unlock(&req->rq_lock);
1734 * Build an RPC by the list of extent @ext_list. The caller must ensure
1735 * that the total pages in this list are NOT over max pages per RPC.
1736 * Extents in the list must be in OES_RPC state.
1738 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1739 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1741 struct ptlrpc_request *req = NULL;
1742 struct osc_extent *ext;
1743 struct brw_page **pga = NULL;
1744 struct osc_brw_async_args *aa = NULL;
1745 struct obdo *oa = NULL;
1746 struct osc_async_page *oap;
1747 struct osc_async_page *tmp;
1748 struct cl_req *clerq = NULL;
1749 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1751 struct cl_req_attr *crattr = NULL;
1752 obd_off starting_offset = OBD_OBJECT_EOF;
1753 obd_off ending_offset = 0;
1757 bool soft_sync = false;
1760 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1761 struct ost_body *body;
1763 LASSERT(!list_empty(ext_list));
1765 /* add pages into rpc_list to build BRW rpc */
1766 list_for_each_entry(ext, ext_list, oe_link) {
1767 LASSERT(ext->oe_state == OES_RPC);
1768 mem_tight |= ext->oe_memalloc;
1769 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1771 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1772 if (starting_offset > oap->oap_obj_off)
1773 starting_offset = oap->oap_obj_off;
1775 LASSERT(oap->oap_page_off == 0);
1776 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1777 ending_offset = oap->oap_obj_off +
1780 LASSERT(oap->oap_page_off + oap->oap_count ==
1785 soft_sync = osc_over_unstable_soft_limit(cli);
1787 mpflag = cfs_memory_pressure_get_and_set();
1789 OBD_ALLOC(crattr, sizeof(*crattr));
1791 GOTO(out, rc = -ENOMEM);
1793 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1795 GOTO(out, rc = -ENOMEM);
1799 GOTO(out, rc = -ENOMEM);
1802 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1803 struct cl_page *page = oap2cl_page(oap);
1804 if (clerq == NULL) {
1805 clerq = cl_req_alloc(env, page, crt,
1806 1 /* only 1-object rpcs for now */);
1808 GOTO(out, rc = PTR_ERR(clerq));
1811 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1813 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1814 pga[i] = &oap->oap_brw_page;
1815 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1816 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1817 pga[i]->pg, page_index(oap->oap_page), oap,
1820 cl_req_page_add(env, clerq, page);
1823 /* always get the data for the obdo for the rpc */
1824 LASSERT(clerq != NULL);
1825 crattr->cra_oa = oa;
1826 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1828 rc = cl_req_prep(env, clerq);
1830 CERROR("cl_req_prep failed: %d\n", rc);
1834 sort_brw_pages(pga, page_count);
1835 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1836 pga, &req, crattr->cra_capa, 1, 0);
1838 CERROR("prep_req failed: %d\n", rc);
1842 req->rq_commit_cb = brw_commit;
1843 req->rq_interpret_reply = brw_interpret;
1846 req->rq_memalloc = 1;
1848 /* Need to update the timestamps after the request is built in case
1849 * we race with setattr (locally or in queue at OST). If OST gets
1850 * later setattr before earlier BRW (as determined by the request xid),
1851 * the OST will not use BRW timestamps. Sadly, there is no obvious
1852 * way to do this in a single call. bug 10150 */
1853 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1854 crattr->cra_oa = &body->oa;
1855 cl_req_attr_set(env, clerq, crattr,
1856 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1858 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1860 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1861 aa = ptlrpc_req_async_args(req);
1862 INIT_LIST_HEAD(&aa->aa_oaps);
1863 list_splice_init(&rpc_list, &aa->aa_oaps);
1864 INIT_LIST_HEAD(&aa->aa_exts);
1865 list_splice_init(ext_list, &aa->aa_exts);
1866 aa->aa_clerq = clerq;
1868 /* queued sync pages can be torn down while the pages
1869 * were between the pending list and the rpc */
1871 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1872 /* only one oap gets a request reference */
1875 if (oap->oap_interrupted && !req->rq_intr) {
1876 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1878 ptlrpc_mark_interrupted(req);
1882 tmp->oap_request = ptlrpc_request_addref(req);
1884 spin_lock(&cli->cl_loi_list_lock);
1885 starting_offset >>= PAGE_CACHE_SHIFT;
1886 if (cmd == OBD_BRW_READ) {
1887 cli->cl_r_in_flight++;
1888 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1889 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1890 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1891 starting_offset + 1);
1893 cli->cl_w_in_flight++;
1894 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1895 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1896 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1897 starting_offset + 1);
1899 spin_unlock(&cli->cl_loi_list_lock);
1901 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1902 page_count, aa, cli->cl_r_in_flight,
1903 cli->cl_w_in_flight);
1905 /* XXX: Maybe the caller can check the RPC bulk descriptor to
1906 * see which CPU/NUMA node the majority of pages were allocated
1907 * on, and try to assign the async RPC to the CPU core
1908 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1910 * But on the other hand, we expect that multiple ptlrpcd
1911 * threads and the initial write sponsor can run in parallel,
1912 * especially when data checksum is enabled, which is CPU-bound
1913 * operation and single ptlrpcd thread cannot process in time.
1914 * So more ptlrpcd threads sharing BRW load
1915 * (with PDL_POLICY_ROUND) seems better.
1917 ptlrpcd_add_req(req, pol, -1);
1923 cfs_memory_pressure_restore(mpflag);
1925 if (crattr != NULL) {
1926 capa_put(crattr->cra_capa);
1927 OBD_FREE(crattr, sizeof(*crattr));
1931 LASSERT(req == NULL);
1936 OBD_FREE(pga, sizeof(*pga) * page_count);
1937 /* this should happen rarely and is pretty bad, it makes the
1938 * pending list not follow the dirty order */
1939 while (!list_empty(ext_list)) {
1940 ext = list_entry(ext_list->next, struct osc_extent,
1942 list_del_init(&ext->oe_link);
1943 osc_extent_finish(env, ext, 0, rc);
1945 if (clerq && !IS_ERR(clerq))
1946 cl_req_completion(env, clerq, rc);
1951 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1952 struct ldlm_enqueue_info *einfo)
1954 void *data = einfo->ei_cbdata;
1957 LASSERT(lock != NULL);
1958 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1959 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1960 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1961 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1963 lock_res_and_lock(lock);
1965 if (lock->l_ast_data == NULL)
1966 lock->l_ast_data = data;
1967 if (lock->l_ast_data == data)
1970 unlock_res_and_lock(lock);
1975 static int osc_set_data_with_check(struct lustre_handle *lockh,
1976 struct ldlm_enqueue_info *einfo)
1978 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1982 set = osc_set_lock_data_with_check(lock, einfo);
1983 LDLM_LOCK_PUT(lock);
1985 CERROR("lockh %p, data %p - client evicted?\n",
1986 lockh, einfo->ei_cbdata);
1990 static int osc_enqueue_fini(struct ptlrpc_request *req,
1991 osc_enqueue_upcall_f upcall, void *cookie,
1992 struct lustre_handle *lockh, ldlm_mode_t mode,
1993 __u64 *flags, int agl, int errcode)
1995 bool intent = *flags & LDLM_FL_HAS_INTENT;
1999 /* The request was created before ldlm_cli_enqueue call. */
2000 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2001 struct ldlm_reply *rep;
2003 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2004 LASSERT(rep != NULL);
2006 rep->lock_policy_res1 =
2007 ptlrpc_status_ntoh(rep->lock_policy_res1);
2008 if (rep->lock_policy_res1)
2009 errcode = rep->lock_policy_res1;
2011 *flags |= LDLM_FL_LVB_READY;
2012 } else if (errcode == ELDLM_OK) {
2013 *flags |= LDLM_FL_LVB_READY;
2016 /* Call the update callback. */
2017 rc = (*upcall)(cookie, lockh, errcode);
2019 /* release the reference taken in ldlm_cli_enqueue() */
2020 if (errcode == ELDLM_LOCK_MATCHED)
2022 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2023 ldlm_lock_decref(lockh, mode);
2028 static int osc_enqueue_interpret(const struct lu_env *env,
2029 struct ptlrpc_request *req,
2030 struct osc_enqueue_args *aa, int rc)
2032 struct ldlm_lock *lock;
2033 struct lustre_handle *lockh = &aa->oa_lockh;
2034 ldlm_mode_t mode = aa->oa_mode;
2035 struct ost_lvb *lvb = aa->oa_lvb;
2036 __u32 lvb_len = sizeof(*lvb);
2041 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2043 lock = ldlm_handle2lock(lockh);
2044 LASSERTF(lock != NULL,
2045 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2046 lockh->cookie, req, aa);
2048 /* Take an additional reference so that a blocking AST that
2049 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2050 * to arrive after an upcall has been executed by
2051 * osc_enqueue_fini(). */
2052 ldlm_lock_addref(lockh, mode);
2054 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2055 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2057 /* Let CP AST to grant the lock first. */
2058 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2061 LASSERT(aa->oa_lvb == NULL);
2062 LASSERT(aa->oa_flags == NULL);
2063 aa->oa_flags = &flags;
2066 /* Complete obtaining the lock procedure. */
2067 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2068 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2070 /* Complete osc stuff. */
2071 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2072 aa->oa_flags, aa->oa_agl, rc);
2074 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2076 ldlm_lock_decref(lockh, mode);
2077 LDLM_LOCK_PUT(lock);
2081 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2083 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2084 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2085 * other synchronous requests, however keeping some locks and trying to obtain
2086 * others may take a considerable amount of time in a case of ost failure; and
2087 * when other sync requests do not get released lock from a client, the client
2088 * is evicted from the cluster -- such scenarious make the life difficult, so
2089 * release locks just after they are obtained. */
2090 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2091 __u64 *flags, ldlm_policy_data_t *policy,
2092 struct ost_lvb *lvb, int kms_valid,
2093 osc_enqueue_upcall_f upcall, void *cookie,
2094 struct ldlm_enqueue_info *einfo,
2095 struct ptlrpc_request_set *rqset, int async, int agl)
2097 struct obd_device *obd = exp->exp_obd;
2098 struct lustre_handle lockh = { 0 };
2099 struct ptlrpc_request *req = NULL;
2100 int intent = *flags & LDLM_FL_HAS_INTENT;
2101 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2106 /* Filesystem lock extents are extended to page boundaries so that
2107 * dealing with the page cache is a little smoother. */
2108 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2109 policy->l_extent.end |= ~CFS_PAGE_MASK;
2112 * kms is not valid when either object is completely fresh (so that no
2113 * locks are cached), or object was evicted. In the latter case cached
2114 * lock cannot be used, because it would prime inode state with
2115 * potentially stale LVB.
2120 /* Next, search for already existing extent locks that will cover us */
2121 /* If we're trying to read, we also search for an existing PW lock. The
2122 * VFS and page cache already protect us locally, so lots of readers/
2123 * writers can share a single PW lock.
2125 * There are problems with conversion deadlocks, so instead of
2126 * converting a read lock to a write lock, we'll just enqueue a new
2129 * At some point we should cancel the read lock instead of making them
2130 * send us a blocking callback, but there are problems with canceling
2131 * locks out from other users right now, too. */
2132 mode = einfo->ei_mode;
2133 if (einfo->ei_mode == LCK_PR)
2135 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2136 einfo->ei_type, policy, mode, &lockh, 0);
2138 struct ldlm_lock *matched;
2140 if (*flags & LDLM_FL_TEST_LOCK)
2143 matched = ldlm_handle2lock(&lockh);
2145 /* AGL enqueues DLM locks speculatively. Therefore if
2146 * it already exists a DLM lock, it wll just inform the
2147 * caller to cancel the AGL process for this stripe. */
2148 ldlm_lock_decref(&lockh, mode);
2149 LDLM_LOCK_PUT(matched);
2151 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2152 *flags |= LDLM_FL_LVB_READY;
2154 /* We already have a lock, and it's referenced. */
2155 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2157 ldlm_lock_decref(&lockh, mode);
2158 LDLM_LOCK_PUT(matched);
2161 ldlm_lock_decref(&lockh, mode);
2162 LDLM_LOCK_PUT(matched);
2167 if (*flags & LDLM_FL_TEST_LOCK)
2171 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2172 &RQF_LDLM_ENQUEUE_LVB);
2176 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2178 ptlrpc_request_free(req);
2182 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2184 ptlrpc_request_set_replen(req);
2187 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2188 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2190 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2191 sizeof(*lvb), LVB_T_OST, &lockh, async);
2194 struct osc_enqueue_args *aa;
2195 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2196 aa = ptlrpc_req_async_args(req);
2198 aa->oa_mode = einfo->ei_mode;
2199 aa->oa_type = einfo->ei_type;
2200 lustre_handle_copy(&aa->oa_lockh, &lockh);
2201 aa->oa_upcall = upcall;
2202 aa->oa_cookie = cookie;
2205 aa->oa_flags = flags;
2208 /* AGL is essentially to enqueue an DLM lock
2209 * in advance, so we don't care about the
2210 * result of AGL enqueue. */
2212 aa->oa_flags = NULL;
2215 req->rq_interpret_reply =
2216 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2217 if (rqset == PTLRPCD_SET)
2218 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2220 ptlrpc_set_add_req(rqset, req);
2221 } else if (intent) {
2222 ptlrpc_req_finished(req);
2227 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2230 ptlrpc_req_finished(req);
2235 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2236 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2237 __u64 *flags, void *data, struct lustre_handle *lockh,
2240 struct obd_device *obd = exp->exp_obd;
2241 __u64 lflags = *flags;
2245 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2248 /* Filesystem lock extents are extended to page boundaries so that
2249 * dealing with the page cache is a little smoother */
2250 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2251 policy->l_extent.end |= ~CFS_PAGE_MASK;
2253 /* Next, search for already existing extent locks that will cover us */
2254 /* If we're trying to read, we also search for an existing PW lock. The
2255 * VFS and page cache already protect us locally, so lots of readers/
2256 * writers can share a single PW lock. */
2260 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2261 res_id, type, policy, rc, lockh, unref);
2264 if (!osc_set_data_with_check(lockh, data)) {
2265 if (!(lflags & LDLM_FL_TEST_LOCK))
2266 ldlm_lock_decref(lockh, rc);
2270 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2271 ldlm_lock_addref(lockh, LCK_PR);
2272 ldlm_lock_decref(lockh, LCK_PW);
2279 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2283 if (unlikely(mode == LCK_GROUP))
2284 ldlm_lock_decref_and_cancel(lockh, mode);
2286 ldlm_lock_decref(lockh, mode);
2291 static int osc_statfs_interpret(const struct lu_env *env,
2292 struct ptlrpc_request *req,
2293 struct osc_async_args *aa, int rc)
2295 struct obd_statfs *msfs;
2299 /* The request has in fact never been sent
2300 * due to issues at a higher level (LOV).
2301 * Exit immediately since the caller is
2302 * aware of the problem and takes care
2303 * of the clean up */
2306 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2307 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2313 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2315 GOTO(out, rc = -EPROTO);
2318 *aa->aa_oi->oi_osfs = *msfs;
2320 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2324 static int osc_statfs_async(struct obd_export *exp,
2325 struct obd_info *oinfo, __u64 max_age,
2326 struct ptlrpc_request_set *rqset)
2328 struct obd_device *obd = class_exp2obd(exp);
2329 struct ptlrpc_request *req;
2330 struct osc_async_args *aa;
2334 /* We could possibly pass max_age in the request (as an absolute
2335 * timestamp or a "seconds.usec ago") so the target can avoid doing
2336 * extra calls into the filesystem if that isn't necessary (e.g.
2337 * during mount that would help a bit). Having relative timestamps
2338 * is not so great if request processing is slow, while absolute
2339 * timestamps are not ideal because they need time synchronization. */
2340 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2344 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2346 ptlrpc_request_free(req);
2349 ptlrpc_request_set_replen(req);
2350 req->rq_request_portal = OST_CREATE_PORTAL;
2351 ptlrpc_at_set_req_timeout(req);
2353 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2354 /* procfs requests not want stat in wait for avoid deadlock */
2355 req->rq_no_resend = 1;
2356 req->rq_no_delay = 1;
2359 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2360 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2361 aa = ptlrpc_req_async_args(req);
2364 ptlrpc_set_add_req(rqset, req);
2368 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2369 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2371 struct obd_device *obd = class_exp2obd(exp);
2372 struct obd_statfs *msfs;
2373 struct ptlrpc_request *req;
2374 struct obd_import *imp = NULL;
2378 /*Since the request might also come from lprocfs, so we need
2379 *sync this with client_disconnect_export Bug15684*/
2380 down_read(&obd->u.cli.cl_sem);
2381 if (obd->u.cli.cl_import)
2382 imp = class_import_get(obd->u.cli.cl_import);
2383 up_read(&obd->u.cli.cl_sem);
2387 /* We could possibly pass max_age in the request (as an absolute
2388 * timestamp or a "seconds.usec ago") so the target can avoid doing
2389 * extra calls into the filesystem if that isn't necessary (e.g.
2390 * during mount that would help a bit). Having relative timestamps
2391 * is not so great if request processing is slow, while absolute
2392 * timestamps are not ideal because they need time synchronization. */
2393 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2395 class_import_put(imp);
2400 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2402 ptlrpc_request_free(req);
2405 ptlrpc_request_set_replen(req);
2406 req->rq_request_portal = OST_CREATE_PORTAL;
2407 ptlrpc_at_set_req_timeout(req);
2409 if (flags & OBD_STATFS_NODELAY) {
2410 /* procfs requests not want stat in wait for avoid deadlock */
2411 req->rq_no_resend = 1;
2412 req->rq_no_delay = 1;
2415 rc = ptlrpc_queue_wait(req);
2419 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2421 GOTO(out, rc = -EPROTO);
2428 ptlrpc_req_finished(req);
2432 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2433 void *karg, void *uarg)
2435 struct obd_device *obd = exp->exp_obd;
2436 struct obd_ioctl_data *data = karg;
2440 if (!try_module_get(THIS_MODULE)) {
2441 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2442 module_name(THIS_MODULE));
2446 case OBD_IOC_CLIENT_RECOVER:
2447 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2448 data->ioc_inlbuf1, 0);
2452 case IOC_OSC_SET_ACTIVE:
2453 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2456 case OBD_IOC_POLL_QUOTACHECK:
2457 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2459 case OBD_IOC_PING_TARGET:
2460 err = ptlrpc_obd_ping(obd);
2463 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2464 cmd, current_comm());
2465 GOTO(out, err = -ENOTTY);
2468 module_put(THIS_MODULE);
2472 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2473 obd_count keylen, void *key, obd_count vallen,
2474 void *val, struct ptlrpc_request_set *set)
2476 struct ptlrpc_request *req;
2477 struct obd_device *obd = exp->exp_obd;
2478 struct obd_import *imp = class_exp2cliimp(exp);
2483 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2485 if (KEY_IS(KEY_CHECKSUM)) {
2486 if (vallen != sizeof(int))
2488 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2492 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2493 sptlrpc_conf_client_adapt(obd);
2497 if (KEY_IS(KEY_FLUSH_CTX)) {
2498 sptlrpc_import_flush_my_ctx(imp);
2502 if (KEY_IS(KEY_CACHE_SET)) {
2503 struct client_obd *cli = &obd->u.cli;
2505 LASSERT(cli->cl_cache == NULL); /* only once */
2506 cli->cl_cache = (struct cl_client_cache *)val;
2507 cl_cache_incref(cli->cl_cache);
2508 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2510 /* add this osc into entity list */
2511 LASSERT(list_empty(&cli->cl_lru_osc));
2512 spin_lock(&cli->cl_cache->ccc_lru_lock);
2513 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2514 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2519 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2520 struct client_obd *cli = &obd->u.cli;
2521 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2522 long target = *(long *)val;
2524 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2529 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2532 /* We pass all other commands directly to OST. Since nobody calls osc
2533 methods directly and everybody is supposed to go through LOV, we
2534 assume lov checked invalid values for us.
2535 The only recognised values so far are evict_by_nid and mds_conn.
2536 Even if something bad goes through, we'd get a -EINVAL from OST
2539 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2540 &RQF_OST_SET_GRANT_INFO :
2545 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2546 RCL_CLIENT, keylen);
2547 if (!KEY_IS(KEY_GRANT_SHRINK))
2548 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2549 RCL_CLIENT, vallen);
2550 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2552 ptlrpc_request_free(req);
2556 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2557 memcpy(tmp, key, keylen);
2558 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2561 memcpy(tmp, val, vallen);
2563 if (KEY_IS(KEY_GRANT_SHRINK)) {
2564 struct osc_grant_args *aa;
2567 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2568 aa = ptlrpc_req_async_args(req);
2571 ptlrpc_req_finished(req);
2574 *oa = ((struct ost_body *)val)->oa;
2576 req->rq_interpret_reply = osc_shrink_grant_interpret;
2579 ptlrpc_request_set_replen(req);
2580 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2581 LASSERT(set != NULL);
2582 ptlrpc_set_add_req(set, req);
2583 ptlrpc_check_set(NULL, set);
2585 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2590 static int osc_reconnect(const struct lu_env *env,
2591 struct obd_export *exp, struct obd_device *obd,
2592 struct obd_uuid *cluuid,
2593 struct obd_connect_data *data,
2596 struct client_obd *cli = &obd->u.cli;
2598 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2601 spin_lock(&cli->cl_loi_list_lock);
2602 data->ocd_grant = (cli->cl_avail_grant +
2603 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2604 2 * cli_brw_size(obd);
2605 lost_grant = cli->cl_lost_grant;
2606 cli->cl_lost_grant = 0;
2607 spin_unlock(&cli->cl_loi_list_lock);
2609 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2610 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2611 data->ocd_version, data->ocd_grant, lost_grant);
2617 static int osc_disconnect(struct obd_export *exp)
2619 struct obd_device *obd = class_exp2obd(exp);
2622 rc = client_disconnect_export(exp);
2624 * Initially we put del_shrink_grant before disconnect_export, but it
2625 * causes the following problem if setup (connect) and cleanup
2626 * (disconnect) are tangled together.
2627 * connect p1 disconnect p2
2628 * ptlrpc_connect_import
2629 * ............... class_manual_cleanup
2632 * ptlrpc_connect_interrupt
2634 * add this client to shrink list
2636 * Bang! pinger trigger the shrink.
2637 * So the osc should be disconnected from the shrink list, after we
2638 * are sure the import has been destroyed. BUG18662
2640 if (obd->u.cli.cl_import == NULL)
2641 osc_del_shrink_grant(&obd->u.cli);
2645 static int osc_import_event(struct obd_device *obd,
2646 struct obd_import *imp,
2647 enum obd_import_event event)
2649 struct client_obd *cli;
2653 LASSERT(imp->imp_obd == obd);
2656 case IMP_EVENT_DISCON: {
2658 spin_lock(&cli->cl_loi_list_lock);
2659 cli->cl_avail_grant = 0;
2660 cli->cl_lost_grant = 0;
2661 spin_unlock(&cli->cl_loi_list_lock);
2664 case IMP_EVENT_INACTIVE: {
2665 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2668 case IMP_EVENT_INVALIDATE: {
2669 struct ldlm_namespace *ns = obd->obd_namespace;
2673 env = cl_env_get(&refcheck);
2677 /* all pages go to failing rpcs due to the invalid
2679 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2681 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2682 cl_env_put(env, &refcheck);
2687 case IMP_EVENT_ACTIVE: {
2688 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2691 case IMP_EVENT_OCD: {
2692 struct obd_connect_data *ocd = &imp->imp_connect_data;
2694 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2695 osc_init_grant(&obd->u.cli, ocd);
2698 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2699 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2701 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2704 case IMP_EVENT_DEACTIVATE: {
2705 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2708 case IMP_EVENT_ACTIVATE: {
2709 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2713 CERROR("Unknown import event %d\n", event);
2720 * Determine whether the lock can be canceled before replaying the lock
2721 * during recovery, see bug16774 for detailed information.
2723 * \retval zero the lock can't be canceled
2724 * \retval other ok to cancel
2726 static int osc_cancel_weight(struct ldlm_lock *lock)
2729 * Cancel all unused and granted extent lock.
2731 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2732 lock->l_granted_mode == lock->l_req_mode &&
2733 osc_ldlm_weigh_ast(lock) == 0)
2739 static int brw_queue_work(const struct lu_env *env, void *data)
2741 struct client_obd *cli = data;
2743 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2745 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2749 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2751 struct client_obd *cli = &obd->u.cli;
2752 struct obd_type *type;
2757 rc = ptlrpcd_addref();
2761 rc = client_obd_setup(obd, lcfg);
2763 GOTO(out_ptlrpcd, rc);
2765 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2766 if (IS_ERR(handler))
2767 GOTO(out_client_setup, rc = PTR_ERR(handler));
2768 cli->cl_writeback_work = handler;
2770 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2771 if (IS_ERR(handler))
2772 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2773 cli->cl_lru_work = handler;
2775 rc = osc_quota_setup(obd);
2777 GOTO(out_ptlrpcd_work, rc);
2779 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2781 #ifdef CONFIG_PROC_FS
2782 obd->obd_vars = lprocfs_osc_obd_vars;
2784 /* If this is true then both client (osc) and server (osp) are on the
2785 * same node. The osp layer if loaded first will register the osc proc
2786 * directory. In that case this obd_device will be attached its proc
2787 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2788 type = class_search_type(LUSTRE_OSP_NAME);
2789 if (type && type->typ_procsym) {
2790 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2792 obd->obd_vars, obd);
2793 if (IS_ERR(obd->obd_proc_entry)) {
2794 rc = PTR_ERR(obd->obd_proc_entry);
2795 CERROR("error %d setting up lprocfs for %s\n", rc,
2797 obd->obd_proc_entry = NULL;
2800 rc = lprocfs_obd_setup(obd);
2803 /* If the basic OSC proc tree construction succeeded then
2804 * lets do the rest. */
2806 lproc_osc_attach_seqstat(obd);
2807 sptlrpc_lprocfs_cliobd_attach(obd);
2808 ptlrpc_lprocfs_register_obd(obd);
2811 /* We need to allocate a few requests more, because
2812 * brw_interpret tries to create new requests before freeing
2813 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2814 * reserved, but I'm afraid that might be too much wasted RAM
2815 * in fact, so 2 is just my guess and still should work. */
2816 cli->cl_import->imp_rq_pool =
2817 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2819 ptlrpc_add_rqs_to_pool);
2821 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2822 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2826 if (cli->cl_writeback_work != NULL) {
2827 ptlrpcd_destroy_work(cli->cl_writeback_work);
2828 cli->cl_writeback_work = NULL;
2830 if (cli->cl_lru_work != NULL) {
2831 ptlrpcd_destroy_work(cli->cl_lru_work);
2832 cli->cl_lru_work = NULL;
2835 client_obd_cleanup(obd);
2841 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2847 case OBD_CLEANUP_EARLY: {
2848 struct obd_import *imp;
2849 imp = obd->u.cli.cl_import;
2850 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2851 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2852 ptlrpc_deactivate_import(imp);
2853 spin_lock(&imp->imp_lock);
2854 imp->imp_pingable = 0;
2855 spin_unlock(&imp->imp_lock);
2858 case OBD_CLEANUP_EXPORTS: {
2859 struct client_obd *cli = &obd->u.cli;
2861 * for echo client, export may be on zombie list, wait for
2862 * zombie thread to cull it, because cli.cl_import will be
2863 * cleared in client_disconnect_export():
2864 * class_export_destroy() -> obd_cleanup() ->
2865 * echo_device_free() -> echo_client_cleanup() ->
2866 * obd_disconnect() -> osc_disconnect() ->
2867 * client_disconnect_export()
2869 obd_zombie_barrier();
2870 if (cli->cl_writeback_work) {
2871 ptlrpcd_destroy_work(cli->cl_writeback_work);
2872 cli->cl_writeback_work = NULL;
2874 if (cli->cl_lru_work) {
2875 ptlrpcd_destroy_work(cli->cl_lru_work);
2876 cli->cl_lru_work = NULL;
2878 obd_cleanup_client_import(obd);
2879 ptlrpc_lprocfs_unregister_obd(obd);
2880 lprocfs_obd_cleanup(obd);
2887 int osc_cleanup(struct obd_device *obd)
2889 struct client_obd *cli = &obd->u.cli;
2895 if (cli->cl_cache != NULL) {
2896 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2897 spin_lock(&cli->cl_cache->ccc_lru_lock);
2898 list_del_init(&cli->cl_lru_osc);
2899 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2900 cli->cl_lru_left = NULL;
2901 cl_cache_decref(cli->cl_cache);
2902 cli->cl_cache = NULL;
2905 /* free memory of osc quota cache */
2906 osc_quota_cleanup(obd);
2908 rc = client_obd_cleanup(obd);
2914 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2916 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2917 return rc > 0 ? 0: rc;
2920 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2922 return osc_process_config_base(obd, buf);
2925 static struct obd_ops osc_obd_ops = {
2926 .o_owner = THIS_MODULE,
2927 .o_setup = osc_setup,
2928 .o_precleanup = osc_precleanup,
2929 .o_cleanup = osc_cleanup,
2930 .o_add_conn = client_import_add_conn,
2931 .o_del_conn = client_import_del_conn,
2932 .o_connect = client_connect_import,
2933 .o_reconnect = osc_reconnect,
2934 .o_disconnect = osc_disconnect,
2935 .o_statfs = osc_statfs,
2936 .o_statfs_async = osc_statfs_async,
2937 .o_create = osc_create,
2938 .o_destroy = osc_destroy,
2939 .o_getattr = osc_getattr,
2940 .o_setattr = osc_setattr,
2941 .o_setattr_async = osc_setattr_async,
2942 .o_iocontrol = osc_iocontrol,
2943 .o_set_info_async = osc_set_info_async,
2944 .o_import_event = osc_import_event,
2945 .o_process_config = osc_process_config,
2946 .o_quotactl = osc_quotactl,
2947 .o_quotacheck = osc_quotacheck,
2950 static int __init osc_init(void)
2952 bool enable_proc = true;
2953 struct obd_type *type;
2957 /* print an address of _any_ initialized kernel symbol from this
2958 * module, to allow debugging with gdb that doesn't support data
2959 * symbols from modules.*/
2960 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2962 rc = lu_kmem_init(osc_caches);
2966 type = class_search_type(LUSTRE_OSP_NAME);
2967 if (type != NULL && type->typ_procsym != NULL)
2968 enable_proc = false;
2970 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2971 LUSTRE_OSC_NAME, &osc_device_type);
2973 lu_kmem_fini(osc_caches);
2980 static void /*__exit*/ osc_exit(void)
2982 class_unregister_type(LUSTRE_OSC_NAME);
2983 lu_kmem_fini(osc_caches);
2986 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2987 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2988 MODULE_LICENSE("GPL");
2990 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);