4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <libcfs/libcfs.h>
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
51 #include "osc_internal.h"
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
61 struct osc_brw_async_args {
67 struct brw_page **aa_ppga;
68 struct client_obd *aa_cli;
69 struct list_head aa_oaps;
70 struct list_head aa_exts;
73 #define osc_grant_args osc_brw_async_args
75 struct osc_setattr_args {
77 obd_enqueue_update_f sa_upcall;
81 struct osc_fsync_args {
82 struct osc_object *fa_obj;
84 obd_enqueue_update_f fa_upcall;
88 struct osc_ladvise_args {
90 obd_enqueue_update_f la_upcall;
94 struct osc_enqueue_args {
95 struct obd_export *oa_exp;
96 enum ldlm_type oa_type;
97 enum ldlm_mode oa_mode;
99 osc_enqueue_upcall_f oa_upcall;
101 struct ost_lvb *oa_lvb;
102 struct lustre_handle oa_lockh;
106 static void osc_release_ppga(struct brw_page **ppga, size_t count);
107 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
110 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
112 struct ost_body *body;
114 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
117 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
120 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
123 struct ptlrpc_request *req;
124 struct ost_body *body;
128 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
132 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
134 ptlrpc_request_free(req);
138 osc_pack_req_body(req, oa);
140 ptlrpc_request_set_replen(req);
142 rc = ptlrpc_queue_wait(req);
146 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
148 GOTO(out, rc = -EPROTO);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
153 oa->o_blksize = cli_brw_size(exp->exp_obd);
154 oa->o_valid |= OBD_MD_FLBLKSZ;
158 ptlrpc_req_finished(req);
163 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
166 struct ptlrpc_request *req;
167 struct ost_body *body;
171 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
173 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
177 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
179 ptlrpc_request_free(req);
183 osc_pack_req_body(req, oa);
185 ptlrpc_request_set_replen(req);
187 rc = ptlrpc_queue_wait(req);
191 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193 GOTO(out, rc = -EPROTO);
195 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
199 ptlrpc_req_finished(req);
204 static int osc_setattr_interpret(const struct lu_env *env,
205 struct ptlrpc_request *req,
206 struct osc_setattr_args *sa, int rc)
208 struct ost_body *body;
214 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216 GOTO(out, rc = -EPROTO);
218 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
221 rc = sa->sa_upcall(sa->sa_cookie, rc);
225 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
226 obd_enqueue_update_f upcall, void *cookie,
227 struct ptlrpc_request_set *rqset)
229 struct ptlrpc_request *req;
230 struct osc_setattr_args *sa;
235 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
239 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
241 ptlrpc_request_free(req);
245 osc_pack_req_body(req, oa);
247 ptlrpc_request_set_replen(req);
249 /* do mds to ost setattr asynchronously */
251 /* Do not wait for response. */
252 ptlrpcd_add_req(req);
254 req->rq_interpret_reply =
255 (ptlrpc_interpterer_t)osc_setattr_interpret;
257 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
258 sa = ptlrpc_req_async_args(req);
260 sa->sa_upcall = upcall;
261 sa->sa_cookie = cookie;
263 if (rqset == PTLRPCD_SET)
264 ptlrpcd_add_req(req);
266 ptlrpc_set_add_req(rqset, req);
272 static int osc_ladvise_interpret(const struct lu_env *env,
273 struct ptlrpc_request *req,
276 struct osc_ladvise_args *la = arg;
277 struct ost_body *body;
283 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
285 GOTO(out, rc = -EPROTO);
287 *la->la_oa = body->oa;
289 rc = la->la_upcall(la->la_cookie, rc);
294 * If rqset is NULL, do not wait for response. Upcall and cookie could also
295 * be NULL in this case
297 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
298 struct ladvise_hdr *ladvise_hdr,
299 obd_enqueue_update_f upcall, void *cookie,
300 struct ptlrpc_request_set *rqset)
302 struct ptlrpc_request *req;
303 struct ost_body *body;
304 struct osc_ladvise_args *la;
306 struct lu_ladvise *req_ladvise;
307 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
308 int num_advise = ladvise_hdr->lah_count;
309 struct ladvise_hdr *req_ladvise_hdr;
312 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
316 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
317 num_advise * sizeof(*ladvise));
318 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
320 ptlrpc_request_free(req);
323 req->rq_request_portal = OST_IO_PORTAL;
324 ptlrpc_at_set_req_timeout(req);
326 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
328 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
331 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
332 &RMF_OST_LADVISE_HDR);
333 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
335 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
336 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
337 ptlrpc_request_set_replen(req);
340 /* Do not wait for response. */
341 ptlrpcd_add_req(req);
345 req->rq_interpret_reply = osc_ladvise_interpret;
346 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
347 la = ptlrpc_req_async_args(req);
349 la->la_upcall = upcall;
350 la->la_cookie = cookie;
352 if (rqset == PTLRPCD_SET)
353 ptlrpcd_add_req(req);
355 ptlrpc_set_add_req(rqset, req);
360 static int osc_create(const struct lu_env *env, struct obd_export *exp,
363 struct ptlrpc_request *req;
364 struct ost_body *body;
369 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
370 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
372 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
374 GOTO(out, rc = -ENOMEM);
376 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
378 ptlrpc_request_free(req);
382 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
385 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
387 ptlrpc_request_set_replen(req);
389 rc = ptlrpc_queue_wait(req);
393 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
395 GOTO(out_req, rc = -EPROTO);
397 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
398 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
400 oa->o_blksize = cli_brw_size(exp->exp_obd);
401 oa->o_valid |= OBD_MD_FLBLKSZ;
403 CDEBUG(D_HA, "transno: %lld\n",
404 lustre_msg_get_transno(req->rq_repmsg));
406 ptlrpc_req_finished(req);
411 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
412 obd_enqueue_update_f upcall, void *cookie,
413 struct ptlrpc_request_set *rqset)
415 struct ptlrpc_request *req;
416 struct osc_setattr_args *sa;
417 struct ost_body *body;
421 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
425 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
427 ptlrpc_request_free(req);
430 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
431 ptlrpc_at_set_req_timeout(req);
433 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
435 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
437 ptlrpc_request_set_replen(req);
439 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
440 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
441 sa = ptlrpc_req_async_args(req);
443 sa->sa_upcall = upcall;
444 sa->sa_cookie = cookie;
445 if (rqset == PTLRPCD_SET)
446 ptlrpcd_add_req(req);
448 ptlrpc_set_add_req(rqset, req);
453 static int osc_sync_interpret(const struct lu_env *env,
454 struct ptlrpc_request *req,
457 struct osc_fsync_args *fa = arg;
458 struct ost_body *body;
459 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
460 unsigned long valid = 0;
461 struct cl_object *obj;
467 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
469 CERROR("can't unpack ost_body\n");
470 GOTO(out, rc = -EPROTO);
473 *fa->fa_oa = body->oa;
474 obj = osc2cl(fa->fa_obj);
476 /* Update osc object's blocks attribute */
477 cl_object_attr_lock(obj);
478 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
479 attr->cat_blocks = body->oa.o_blocks;
484 cl_object_attr_update(env, obj, attr, valid);
485 cl_object_attr_unlock(obj);
488 rc = fa->fa_upcall(fa->fa_cookie, rc);
492 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
493 obd_enqueue_update_f upcall, void *cookie,
494 struct ptlrpc_request_set *rqset)
496 struct obd_export *exp = osc_export(obj);
497 struct ptlrpc_request *req;
498 struct ost_body *body;
499 struct osc_fsync_args *fa;
503 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
507 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
509 ptlrpc_request_free(req);
513 /* overload the size and blocks fields in the oa with start/end */
514 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
516 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
518 ptlrpc_request_set_replen(req);
519 req->rq_interpret_reply = osc_sync_interpret;
521 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
522 fa = ptlrpc_req_async_args(req);
525 fa->fa_upcall = upcall;
526 fa->fa_cookie = cookie;
528 if (rqset == PTLRPCD_SET)
529 ptlrpcd_add_req(req);
531 ptlrpc_set_add_req(rqset, req);
536 /* Find and cancel locally locks matched by @mode in the resource found by
537 * @objid. Found locks are added into @cancel list. Returns the amount of
538 * locks added to @cancels list. */
539 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
540 struct list_head *cancels,
541 enum ldlm_mode mode, __u64 lock_flags)
543 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
544 struct ldlm_res_id res_id;
545 struct ldlm_resource *res;
549 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
550 * export) but disabled through procfs (flag in NS).
552 * This distinguishes from a case when ELC is not supported originally,
553 * when we still want to cancel locks in advance and just cancel them
554 * locally, without sending any RPC. */
555 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
558 ostid_build_res_name(&oa->o_oi, &res_id);
559 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
563 LDLM_RESOURCE_ADDREF(res);
564 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565 lock_flags, 0, NULL);
566 LDLM_RESOURCE_DELREF(res);
567 ldlm_resource_putref(res);
571 static int osc_destroy_interpret(const struct lu_env *env,
572 struct ptlrpc_request *req, void *data,
575 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
577 atomic_dec(&cli->cl_destroy_in_flight);
578 wake_up(&cli->cl_destroy_waitq);
582 static int osc_can_send_destroy(struct client_obd *cli)
584 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
585 cli->cl_max_rpcs_in_flight) {
586 /* The destroy request can be sent */
589 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
590 cli->cl_max_rpcs_in_flight) {
592 * The counter has been modified between the two atomic
595 wake_up(&cli->cl_destroy_waitq);
600 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
603 struct client_obd *cli = &exp->exp_obd->u.cli;
604 struct ptlrpc_request *req;
605 struct ost_body *body;
606 struct list_head cancels = LIST_HEAD_INIT(cancels);
611 CDEBUG(D_INFO, "oa NULL\n");
615 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
616 LDLM_FL_DISCARD_DATA);
618 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
620 ldlm_lock_list_put(&cancels, l_bl_ast, count);
624 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
627 ptlrpc_request_free(req);
631 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
632 ptlrpc_at_set_req_timeout(req);
634 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
636 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
638 ptlrpc_request_set_replen(req);
640 req->rq_interpret_reply = osc_destroy_interpret;
641 if (!osc_can_send_destroy(cli)) {
642 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
645 * Wait until the number of on-going destroy RPCs drops
646 * under max_rpc_in_flight
648 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
649 osc_can_send_destroy(cli), &lwi);
651 ptlrpc_req_finished(req);
656 /* Do not wait for response */
657 ptlrpcd_add_req(req);
661 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
666 LASSERT(!(oa->o_valid & bits));
669 spin_lock(&cli->cl_loi_list_lock);
670 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
671 oa->o_dirty = cli->cl_dirty_grant;
673 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
674 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
675 cli->cl_dirty_max_pages)) {
676 CERROR("dirty %lu - %lu > dirty_max %lu\n",
677 cli->cl_dirty_pages, cli->cl_dirty_transit,
678 cli->cl_dirty_max_pages);
680 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
681 atomic_long_read(&obd_dirty_transit_pages) >
682 (long)(obd_max_dirty_pages + 1))) {
683 /* The atomic_read() allowing the atomic_inc() are
684 * not covered by a lock thus they may safely race and trip
685 * this CERROR() unless we add in a small fudge factor (+1). */
686 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
687 cli_name(cli), atomic_long_read(&obd_dirty_pages),
688 atomic_long_read(&obd_dirty_transit_pages),
689 obd_max_dirty_pages);
691 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
693 CERROR("dirty %lu - dirty_max %lu too big???\n",
694 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697 unsigned long nrpages;
699 nrpages = cli->cl_max_pages_per_rpc;
700 nrpages *= cli->cl_max_rpcs_in_flight + 1;
701 nrpages = max(nrpages, cli->cl_dirty_max_pages);
702 oa->o_undirty = nrpages << PAGE_SHIFT;
703 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
707 /* take extent tax into account when asking for more
709 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
710 cli->cl_max_extent_pages;
711 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
715 oa->o_dropped = cli->cl_lost_grant;
716 cli->cl_lost_grant = 0;
717 spin_unlock(&cli->cl_loi_list_lock);
718 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
719 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 void osc_update_next_shrink(struct client_obd *cli)
724 cli->cl_next_shrink_grant =
725 cfs_time_shift(cli->cl_grant_shrink_interval);
726 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
727 cli->cl_next_shrink_grant);
730 static void __osc_update_grant(struct client_obd *cli, u64 grant)
732 spin_lock(&cli->cl_loi_list_lock);
733 cli->cl_avail_grant += grant;
734 spin_unlock(&cli->cl_loi_list_lock);
737 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
739 if (body->oa.o_valid & OBD_MD_FLGRANT) {
740 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
741 __osc_update_grant(cli, body->oa.o_grant);
745 static int osc_shrink_grant_interpret(const struct lu_env *env,
746 struct ptlrpc_request *req,
749 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
750 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
751 struct ost_body *body;
754 __osc_update_grant(cli, oa->o_grant);
758 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
760 osc_update_grant(cli, body);
766 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
768 spin_lock(&cli->cl_loi_list_lock);
769 oa->o_grant = cli->cl_avail_grant / 4;
770 cli->cl_avail_grant -= oa->o_grant;
771 spin_unlock(&cli->cl_loi_list_lock);
772 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
773 oa->o_valid |= OBD_MD_FLFLAGS;
776 oa->o_flags |= OBD_FL_SHRINK_GRANT;
777 osc_update_next_shrink(cli);
780 /* Shrink the current grant, either from some large amount to enough for a
781 * full set of in-flight RPCs, or if we have already shrunk to that limit
782 * then to enough for a single RPC. This avoids keeping more grant than
783 * needed, and avoids shrinking the grant piecemeal. */
784 static int osc_shrink_grant(struct client_obd *cli)
786 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
787 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
789 spin_lock(&cli->cl_loi_list_lock);
790 if (cli->cl_avail_grant <= target_bytes)
791 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
792 spin_unlock(&cli->cl_loi_list_lock);
794 return osc_shrink_grant_to_target(cli, target_bytes);
797 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
800 struct ost_body *body;
803 spin_lock(&cli->cl_loi_list_lock);
804 /* Don't shrink if we are already above or below the desired limit
805 * We don't want to shrink below a single RPC, as that will negatively
806 * impact block allocation and long-term performance. */
807 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
808 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
810 if (target_bytes >= cli->cl_avail_grant) {
811 spin_unlock(&cli->cl_loi_list_lock);
814 spin_unlock(&cli->cl_loi_list_lock);
820 osc_announce_cached(cli, &body->oa, 0);
822 spin_lock(&cli->cl_loi_list_lock);
823 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
824 cli->cl_avail_grant = target_bytes;
825 spin_unlock(&cli->cl_loi_list_lock);
826 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
827 body->oa.o_valid |= OBD_MD_FLFLAGS;
828 body->oa.o_flags = 0;
830 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
831 osc_update_next_shrink(cli);
833 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
834 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
835 sizeof(*body), body, NULL);
837 __osc_update_grant(cli, body->oa.o_grant);
842 static int osc_should_shrink_grant(struct client_obd *client)
844 cfs_time_t time = cfs_time_current();
845 cfs_time_t next_shrink = client->cl_next_shrink_grant;
847 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
848 OBD_CONNECT_GRANT_SHRINK) == 0)
851 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
852 /* Get the current RPC size directly, instead of going via:
853 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
854 * Keep comment here so that it can be found by searching. */
855 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
857 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
858 client->cl_avail_grant > brw_size)
861 osc_update_next_shrink(client);
866 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
868 struct client_obd *client;
870 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
871 if (osc_should_shrink_grant(client))
872 osc_shrink_grant(client);
877 static int osc_add_shrink_grant(struct client_obd *client)
881 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
883 osc_grant_shrink_grant_cb, NULL,
884 &client->cl_grant_shrink_list);
886 CERROR("add grant client %s error %d\n", cli_name(client), rc);
889 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
890 osc_update_next_shrink(client);
894 static int osc_del_shrink_grant(struct client_obd *client)
896 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
900 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
903 * ocd_grant is the total grant amount we're expect to hold: if we've
904 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
905 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
908 * race is tolerable here: if we're evicted, but imp_state already
909 * left EVICTED state, then cl_dirty_pages must be 0 already.
911 spin_lock(&cli->cl_loi_list_lock);
912 cli->cl_avail_grant = ocd->ocd_grant;
913 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
914 cli->cl_avail_grant -= cli->cl_reserved_grant;
915 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
916 cli->cl_avail_grant -= cli->cl_dirty_grant;
918 cli->cl_avail_grant -=
919 cli->cl_dirty_pages << PAGE_SHIFT;
922 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
926 /* overhead for each extent insertion */
927 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
928 /* determine the appropriate chunk size used by osc_extent. */
929 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
930 ocd->ocd_grant_blkbits);
931 /* max_pages_per_rpc must be chunk aligned */
932 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
933 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
934 ~chunk_mask) & chunk_mask;
935 /* determine maximum extent size, in #pages */
936 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
937 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
938 if (cli->cl_max_extent_pages == 0)
939 cli->cl_max_extent_pages = 1;
941 cli->cl_grant_extent_tax = 0;
942 cli->cl_chunkbits = PAGE_SHIFT;
943 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
945 spin_unlock(&cli->cl_loi_list_lock);
947 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
948 "chunk bits: %d cl_max_extent_pages: %d\n",
950 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
951 cli->cl_max_extent_pages);
953 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
954 list_empty(&cli->cl_grant_shrink_list))
955 osc_add_shrink_grant(cli);
957 EXPORT_SYMBOL(osc_init_grant);
959 /* We assume that the reason this OSC got a short read is because it read
960 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
961 * via the LOV, and it _knows_ it's reading inside the file, it's just that
962 * this stripe never got written at or beyond this stripe offset yet. */
963 static void handle_short_read(int nob_read, size_t page_count,
964 struct brw_page **pga)
969 /* skip bytes read OK */
970 while (nob_read > 0) {
971 LASSERT (page_count > 0);
973 if (pga[i]->count > nob_read) {
974 /* EOF inside this page */
975 ptr = kmap(pga[i]->pg) +
976 (pga[i]->off & ~PAGE_MASK);
977 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
984 nob_read -= pga[i]->count;
989 /* zero remaining pages */
990 while (page_count-- > 0) {
991 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
992 memset(ptr, 0, pga[i]->count);
998 static int check_write_rcs(struct ptlrpc_request *req,
999 int requested_nob, int niocount,
1000 size_t page_count, struct brw_page **pga)
1005 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1006 sizeof(*remote_rcs) *
1008 if (remote_rcs == NULL) {
1009 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1013 /* return error if any niobuf was in error */
1014 for (i = 0; i < niocount; i++) {
1015 if ((int)remote_rcs[i] < 0)
1016 return(remote_rcs[i]);
1018 if (remote_rcs[i] != 0) {
1019 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1020 i, remote_rcs[i], req);
1025 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1026 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1027 req->rq_bulk->bd_nob_transferred, requested_nob);
1034 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1036 if (p1->flag != p2->flag) {
1037 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1038 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1039 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1041 /* warn if we try to combine flags that we don't know to be
1042 * safe to combine */
1043 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1044 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1045 "report this at https://jira.hpdd.intel.com/\n",
1046 p1->flag, p2->flag);
1051 return (p1->off + p1->count == p2->off);
1054 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1055 struct brw_page **pga, int opc,
1056 enum cksum_types cksum_type)
1060 struct cfs_crypto_hash_desc *hdesc;
1061 unsigned int bufsize;
1062 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1064 LASSERT(pg_count > 0);
1066 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1067 if (IS_ERR(hdesc)) {
1068 CERROR("Unable to initialize checksum hash %s\n",
1069 cfs_crypto_hash_name(cfs_alg));
1070 return PTR_ERR(hdesc);
1073 while (nob > 0 && pg_count > 0) {
1074 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1076 /* corrupt the data before we compute the checksum, to
1077 * simulate an OST->client data error */
1078 if (i == 0 && opc == OST_READ &&
1079 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1080 unsigned char *ptr = kmap(pga[i]->pg);
1081 int off = pga[i]->off & ~PAGE_MASK;
1083 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1086 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1087 pga[i]->off & ~PAGE_MASK,
1089 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1090 (int)(pga[i]->off & ~PAGE_MASK));
1092 nob -= pga[i]->count;
1097 bufsize = sizeof(cksum);
1098 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1100 /* For sending we only compute the wrong checksum instead
1101 * of corrupting the data so it is still correct on a redo */
1102 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1109 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1110 u32 page_count, struct brw_page **pga,
1111 struct ptlrpc_request **reqp, int resend)
1113 struct ptlrpc_request *req;
1114 struct ptlrpc_bulk_desc *desc;
1115 struct ost_body *body;
1116 struct obd_ioobj *ioobj;
1117 struct niobuf_remote *niobuf;
1118 int niocount, i, requested_nob, opc, rc;
1119 struct osc_brw_async_args *aa;
1120 struct req_capsule *pill;
1121 struct brw_page *pg_prev;
1124 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1125 RETURN(-ENOMEM); /* Recoverable */
1126 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1127 RETURN(-EINVAL); /* Fatal */
1129 if ((cmd & OBD_BRW_WRITE) != 0) {
1131 req = ptlrpc_request_alloc_pool(cli->cl_import,
1133 &RQF_OST_BRW_WRITE);
1136 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1141 for (niocount = i = 1; i < page_count; i++) {
1142 if (!can_merge_pages(pga[i - 1], pga[i]))
1146 pill = &req->rq_pill;
1147 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1149 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1150 niocount * sizeof(*niobuf));
1152 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1154 ptlrpc_request_free(req);
1157 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1158 ptlrpc_at_set_req_timeout(req);
1159 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1161 req->rq_no_retry_einprogress = 1;
1163 desc = ptlrpc_prep_bulk_imp(req, page_count,
1164 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1165 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1166 PTLRPC_BULK_PUT_SINK) |
1167 PTLRPC_BULK_BUF_KIOV,
1169 &ptlrpc_bulk_kiov_pin_ops);
1172 GOTO(out, rc = -ENOMEM);
1173 /* NB request now owns desc and will free it when it gets freed */
1175 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1176 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1177 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1178 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1180 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1182 obdo_to_ioobj(oa, ioobj);
1183 ioobj->ioo_bufcnt = niocount;
1184 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1185 * that might be send for this request. The actual number is decided
1186 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1187 * "max - 1" for old client compatibility sending "0", and also so the
1188 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1189 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1190 LASSERT(page_count > 0);
1192 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1193 struct brw_page *pg = pga[i];
1194 int poff = pg->off & ~PAGE_MASK;
1196 LASSERT(pg->count > 0);
1197 /* make sure there is no gap in the middle of page array */
1198 LASSERTF(page_count == 1 ||
1199 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1200 ergo(i > 0 && i < page_count - 1,
1201 poff == 0 && pg->count == PAGE_SIZE) &&
1202 ergo(i == page_count - 1, poff == 0)),
1203 "i: %d/%d pg: %p off: %llu, count: %u\n",
1204 i, page_count, pg, pg->off, pg->count);
1205 LASSERTF(i == 0 || pg->off > pg_prev->off,
1206 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1207 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1209 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1210 pg_prev->pg, page_private(pg_prev->pg),
1211 pg_prev->pg->index, pg_prev->off);
1212 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1213 (pg->flag & OBD_BRW_SRVLOCK));
1215 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1216 requested_nob += pg->count;
1218 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1220 niobuf->rnb_len += pg->count;
1222 niobuf->rnb_offset = pg->off;
1223 niobuf->rnb_len = pg->count;
1224 niobuf->rnb_flags = pg->flag;
1229 LASSERTF((void *)(niobuf - niocount) ==
1230 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1231 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1232 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1234 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1236 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1237 body->oa.o_valid |= OBD_MD_FLFLAGS;
1238 body->oa.o_flags = 0;
1240 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1243 if (osc_should_shrink_grant(cli))
1244 osc_shrink_grant_local(cli, &body->oa);
1246 /* size[REQ_REC_OFF] still sizeof (*body) */
1247 if (opc == OST_WRITE) {
1248 if (cli->cl_checksum &&
1249 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1250 /* store cl_cksum_type in a local variable since
1251 * it can be changed via lprocfs */
1252 enum cksum_types cksum_type = cli->cl_cksum_type;
1254 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1255 body->oa.o_flags = 0;
1257 body->oa.o_flags |= cksum_type_pack(cksum_type);
1258 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1259 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1263 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1265 /* save this in 'oa', too, for later checking */
1266 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1267 oa->o_flags |= cksum_type_pack(cksum_type);
1269 /* clear out the checksum flag, in case this is a
1270 * resend but cl_checksum is no longer set. b=11238 */
1271 oa->o_valid &= ~OBD_MD_FLCKSUM;
1273 oa->o_cksum = body->oa.o_cksum;
1274 /* 1 RC per niobuf */
1275 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1276 sizeof(__u32) * niocount);
1278 if (cli->cl_checksum &&
1279 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1280 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1281 body->oa.o_flags = 0;
1282 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1283 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1286 /* Client cksum has been already copied to wire obdo in previous
1287 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1288 * resent due to cksum error, this will allow Server to
1289 * check+dump pages on its side */
1291 ptlrpc_request_set_replen(req);
1293 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1294 aa = ptlrpc_req_async_args(req);
1296 aa->aa_requested_nob = requested_nob;
1297 aa->aa_nio_count = niocount;
1298 aa->aa_page_count = page_count;
1302 INIT_LIST_HEAD(&aa->aa_oaps);
1305 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1306 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1307 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1308 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1312 ptlrpc_req_finished(req);
1316 char dbgcksum_file_name[PATH_MAX];
1318 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1319 struct brw_page **pga, __u32 server_cksum,
1328 /* will only keep dump of pages on first error for the same range in
1329 * file/fid, not during the resends/retries. */
1330 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1331 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1332 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1333 libcfs_debug_file_path_arr :
1334 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1335 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1336 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1337 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1339 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1340 client_cksum, server_cksum);
1341 filp = filp_open(dbgcksum_file_name,
1342 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1346 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1347 "checksum error: rc = %d\n", dbgcksum_file_name,
1350 CERROR("%s: can't open to dump pages with checksum "
1351 "error: rc = %d\n", dbgcksum_file_name, rc);
1357 for (i = 0; i < page_count; i++) {
1358 len = pga[i]->count;
1359 buf = kmap(pga[i]->pg);
1361 rc = vfs_write(filp, (__force const char __user *)buf,
1364 CERROR("%s: wanted to write %u but got %d "
1365 "error\n", dbgcksum_file_name, len, rc);
1370 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1371 dbgcksum_file_name, rc);
1377 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1379 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1380 filp_close(filp, NULL);
1385 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1386 __u32 client_cksum, __u32 server_cksum,
1387 struct osc_brw_async_args *aa)
1391 enum cksum_types cksum_type;
1393 if (server_cksum == client_cksum) {
1394 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1398 if (aa->aa_cli->cl_checksum_dump)
1399 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1400 server_cksum, client_cksum);
1402 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1404 new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1405 aa->aa_ppga, OST_WRITE, cksum_type);
1407 if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1408 msg = "the server did not use the checksum type specified in "
1409 "the original request - likely a protocol problem";
1410 else if (new_cksum == server_cksum)
1411 msg = "changed on the client after we checksummed it - "
1412 "likely false positive due to mmap IO (bug 11742)";
1413 else if (new_cksum == client_cksum)
1414 msg = "changed in transit before arrival at OST";
1416 msg = "changed in transit AND doesn't match the original - "
1417 "likely false positive due to mmap IO (bug 11742)";
1419 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1420 DFID " object "DOSTID" extent [%llu-%llu], original "
1421 "client csum %x (type %x), server csum %x (type %x),"
1422 " client csum now %x\n",
1423 aa->aa_cli->cl_import->imp_obd->obd_name,
1424 msg, libcfs_nid2str(peer->nid),
1425 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1426 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1427 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1428 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1429 aa->aa_ppga[aa->aa_page_count - 1]->off +
1430 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1431 client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1432 server_cksum, cksum_type, new_cksum);
1436 /* Note rc enters this function as number of bytes transferred */
1437 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1439 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1440 const struct lnet_process_id *peer =
1441 &req->rq_import->imp_connection->c_peer;
1442 struct client_obd *cli = aa->aa_cli;
1443 struct ost_body *body;
1444 u32 client_cksum = 0;
1447 if (rc < 0 && rc != -EDQUOT) {
1448 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1452 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1453 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1455 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1459 /* set/clear over quota flag for a uid/gid/projid */
1460 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1461 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1462 unsigned qid[LL_MAXQUOTAS] = {
1463 body->oa.o_uid, body->oa.o_gid,
1464 body->oa.o_projid };
1465 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1466 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1467 body->oa.o_valid, body->oa.o_flags);
1468 osc_quota_setdq(cli, qid, body->oa.o_valid,
1472 osc_update_grant(cli, body);
1477 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1478 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1480 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1482 CERROR("Unexpected +ve rc %d\n", rc);
1485 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1487 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1490 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1491 check_write_checksum(&body->oa, peer, client_cksum,
1492 body->oa.o_cksum, aa))
1495 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1496 aa->aa_page_count, aa->aa_ppga);
1500 /* The rest of this function executes only for OST_READs */
1502 /* if unwrap_bulk failed, return -EAGAIN to retry */
1503 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1505 GOTO(out, rc = -EAGAIN);
1507 if (rc > aa->aa_requested_nob) {
1508 CERROR("Unexpected rc %d (%d requested)\n", rc,
1509 aa->aa_requested_nob);
1513 if (rc != req->rq_bulk->bd_nob_transferred) {
1514 CERROR ("Unexpected rc %d (%d transferred)\n",
1515 rc, req->rq_bulk->bd_nob_transferred);
1519 if (rc < aa->aa_requested_nob)
1520 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1522 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1523 static int cksum_counter;
1524 u32 server_cksum = body->oa.o_cksum;
1527 enum cksum_types cksum_type;
1529 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1530 body->oa.o_flags : 0);
1531 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1532 aa->aa_ppga, OST_READ,
1535 if (peer->nid != req->rq_bulk->bd_sender) {
1537 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1540 if (server_cksum != client_cksum) {
1541 struct ost_body *clbody;
1542 u32 page_count = aa->aa_page_count;
1544 clbody = req_capsule_client_get(&req->rq_pill,
1546 if (cli->cl_checksum_dump)
1547 dump_all_bulk_pages(&clbody->oa, page_count,
1548 aa->aa_ppga, server_cksum,
1551 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1552 "%s%s%s inode "DFID" object "DOSTID
1553 " extent [%llu-%llu], client %x, "
1554 "server %x, cksum_type %x\n",
1555 req->rq_import->imp_obd->obd_name,
1556 libcfs_nid2str(peer->nid),
1558 clbody->oa.o_valid & OBD_MD_FLFID ?
1559 clbody->oa.o_parent_seq : 0ULL,
1560 clbody->oa.o_valid & OBD_MD_FLFID ?
1561 clbody->oa.o_parent_oid : 0,
1562 clbody->oa.o_valid & OBD_MD_FLFID ?
1563 clbody->oa.o_parent_ver : 0,
1564 POSTID(&body->oa.o_oi),
1565 aa->aa_ppga[0]->off,
1566 aa->aa_ppga[page_count-1]->off +
1567 aa->aa_ppga[page_count-1]->count - 1,
1568 client_cksum, server_cksum,
1571 aa->aa_oa->o_cksum = client_cksum;
1575 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1578 } else if (unlikely(client_cksum)) {
1579 static int cksum_missed;
1582 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1583 CERROR("Checksum %u requested from %s but not sent\n",
1584 cksum_missed, libcfs_nid2str(peer->nid));
1590 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1591 aa->aa_oa, &body->oa);
1596 static int osc_brw_redo_request(struct ptlrpc_request *request,
1597 struct osc_brw_async_args *aa, int rc)
1599 struct ptlrpc_request *new_req;
1600 struct osc_brw_async_args *new_aa;
1601 struct osc_async_page *oap;
1604 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1605 "redo for recoverable error %d", rc);
1607 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1608 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1609 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1610 aa->aa_ppga, &new_req, 1);
1614 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1615 if (oap->oap_request != NULL) {
1616 LASSERTF(request == oap->oap_request,
1617 "request %p != oap_request %p\n",
1618 request, oap->oap_request);
1619 if (oap->oap_interrupted) {
1620 ptlrpc_req_finished(new_req);
1625 /* New request takes over pga and oaps from old request.
1626 * Note that copying a list_head doesn't work, need to move it... */
1628 new_req->rq_interpret_reply = request->rq_interpret_reply;
1629 new_req->rq_async_args = request->rq_async_args;
1630 new_req->rq_commit_cb = request->rq_commit_cb;
1631 /* cap resend delay to the current request timeout, this is similar to
1632 * what ptlrpc does (see after_reply()) */
1633 if (aa->aa_resends > new_req->rq_timeout)
1634 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1636 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1637 new_req->rq_generation_set = 1;
1638 new_req->rq_import_generation = request->rq_import_generation;
1640 new_aa = ptlrpc_req_async_args(new_req);
1642 INIT_LIST_HEAD(&new_aa->aa_oaps);
1643 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1644 INIT_LIST_HEAD(&new_aa->aa_exts);
1645 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1646 new_aa->aa_resends = aa->aa_resends;
1648 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1649 if (oap->oap_request) {
1650 ptlrpc_req_finished(oap->oap_request);
1651 oap->oap_request = ptlrpc_request_addref(new_req);
1655 /* XXX: This code will run into problem if we're going to support
1656 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1657 * and wait for all of them to be finished. We should inherit request
1658 * set from old request. */
1659 ptlrpcd_add_req(new_req);
1661 DEBUG_REQ(D_INFO, new_req, "new request");
1666 * ugh, we want disk allocation on the target to happen in offset order. we'll
1667 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1668 * fine for our small page arrays and doesn't require allocation. its an
1669 * insertion sort that swaps elements that are strides apart, shrinking the
1670 * stride down until its '1' and the array is sorted.
1672 static void sort_brw_pages(struct brw_page **array, int num)
1675 struct brw_page *tmp;
1679 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1684 for (i = stride ; i < num ; i++) {
1687 while (j >= stride && array[j - stride]->off > tmp->off) {
1688 array[j] = array[j - stride];
1693 } while (stride > 1);
1696 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1698 LASSERT(ppga != NULL);
1699 OBD_FREE(ppga, sizeof(*ppga) * count);
1702 static int brw_interpret(const struct lu_env *env,
1703 struct ptlrpc_request *req, void *data, int rc)
1705 struct osc_brw_async_args *aa = data;
1706 struct osc_extent *ext;
1707 struct osc_extent *tmp;
1708 struct client_obd *cli = aa->aa_cli;
1711 rc = osc_brw_fini_request(req, rc);
1712 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1713 /* When server return -EINPROGRESS, client should always retry
1714 * regardless of the number of times the bulk was resent already. */
1715 if (osc_recoverable_error(rc)) {
1716 if (req->rq_import_generation !=
1717 req->rq_import->imp_generation) {
1718 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1719 ""DOSTID", rc = %d.\n",
1720 req->rq_import->imp_obd->obd_name,
1721 POSTID(&aa->aa_oa->o_oi), rc);
1722 } else if (rc == -EINPROGRESS ||
1723 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1724 rc = osc_brw_redo_request(req, aa, rc);
1726 CERROR("%s: too many resent retries for object: "
1727 "%llu:%llu, rc = %d.\n",
1728 req->rq_import->imp_obd->obd_name,
1729 POSTID(&aa->aa_oa->o_oi), rc);
1734 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1739 struct obdo *oa = aa->aa_oa;
1740 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1741 unsigned long valid = 0;
1742 struct cl_object *obj;
1743 struct osc_async_page *last;
1745 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1746 obj = osc2cl(last->oap_obj);
1748 cl_object_attr_lock(obj);
1749 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1750 attr->cat_blocks = oa->o_blocks;
1751 valid |= CAT_BLOCKS;
1753 if (oa->o_valid & OBD_MD_FLMTIME) {
1754 attr->cat_mtime = oa->o_mtime;
1757 if (oa->o_valid & OBD_MD_FLATIME) {
1758 attr->cat_atime = oa->o_atime;
1761 if (oa->o_valid & OBD_MD_FLCTIME) {
1762 attr->cat_ctime = oa->o_ctime;
1766 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1767 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1768 loff_t last_off = last->oap_count + last->oap_obj_off +
1771 /* Change file size if this is an out of quota or
1772 * direct IO write and it extends the file size */
1773 if (loi->loi_lvb.lvb_size < last_off) {
1774 attr->cat_size = last_off;
1777 /* Extend KMS if it's not a lockless write */
1778 if (loi->loi_kms < last_off &&
1779 oap2osc_page(last)->ops_srvlock == 0) {
1780 attr->cat_kms = last_off;
1786 cl_object_attr_update(env, obj, attr, valid);
1787 cl_object_attr_unlock(obj);
1789 OBDO_FREE(aa->aa_oa);
1791 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1792 osc_inc_unstable_pages(req);
1794 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1795 list_del_init(&ext->oe_link);
1796 osc_extent_finish(env, ext, 1, rc);
1798 LASSERT(list_empty(&aa->aa_exts));
1799 LASSERT(list_empty(&aa->aa_oaps));
1801 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1802 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1804 spin_lock(&cli->cl_loi_list_lock);
1805 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1806 * is called so we know whether to go to sync BRWs or wait for more
1807 * RPCs to complete */
1808 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1809 cli->cl_w_in_flight--;
1811 cli->cl_r_in_flight--;
1812 osc_wake_cache_waiters(cli);
1813 spin_unlock(&cli->cl_loi_list_lock);
1815 osc_io_unplug(env, cli, NULL);
1819 static void brw_commit(struct ptlrpc_request *req)
1821 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1822 * this called via the rq_commit_cb, I need to ensure
1823 * osc_dec_unstable_pages is still called. Otherwise unstable
1824 * pages may be leaked. */
1825 spin_lock(&req->rq_lock);
1826 if (likely(req->rq_unstable)) {
1827 req->rq_unstable = 0;
1828 spin_unlock(&req->rq_lock);
1830 osc_dec_unstable_pages(req);
1832 req->rq_committed = 1;
1833 spin_unlock(&req->rq_lock);
1838 * Build an RPC by the list of extent @ext_list. The caller must ensure
1839 * that the total pages in this list are NOT over max pages per RPC.
1840 * Extents in the list must be in OES_RPC state.
1842 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1843 struct list_head *ext_list, int cmd)
1845 struct ptlrpc_request *req = NULL;
1846 struct osc_extent *ext;
1847 struct brw_page **pga = NULL;
1848 struct osc_brw_async_args *aa = NULL;
1849 struct obdo *oa = NULL;
1850 struct osc_async_page *oap;
1851 struct osc_object *obj = NULL;
1852 struct cl_req_attr *crattr = NULL;
1853 loff_t starting_offset = OBD_OBJECT_EOF;
1854 loff_t ending_offset = 0;
1858 bool soft_sync = false;
1859 bool interrupted = false;
1863 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1864 struct ost_body *body;
1866 LASSERT(!list_empty(ext_list));
1868 /* add pages into rpc_list to build BRW rpc */
1869 list_for_each_entry(ext, ext_list, oe_link) {
1870 LASSERT(ext->oe_state == OES_RPC);
1871 mem_tight |= ext->oe_memalloc;
1872 grant += ext->oe_grants;
1873 page_count += ext->oe_nr_pages;
1878 soft_sync = osc_over_unstable_soft_limit(cli);
1880 mpflag = cfs_memory_pressure_get_and_set();
1882 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1884 GOTO(out, rc = -ENOMEM);
1888 GOTO(out, rc = -ENOMEM);
1891 list_for_each_entry(ext, ext_list, oe_link) {
1892 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1894 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1896 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1897 pga[i] = &oap->oap_brw_page;
1898 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1901 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1902 if (starting_offset == OBD_OBJECT_EOF ||
1903 starting_offset > oap->oap_obj_off)
1904 starting_offset = oap->oap_obj_off;
1906 LASSERT(oap->oap_page_off == 0);
1907 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1908 ending_offset = oap->oap_obj_off +
1911 LASSERT(oap->oap_page_off + oap->oap_count ==
1913 if (oap->oap_interrupted)
1918 /* first page in the list */
1919 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1921 crattr = &osc_env_info(env)->oti_req_attr;
1922 memset(crattr, 0, sizeof(*crattr));
1923 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1924 crattr->cra_flags = ~0ULL;
1925 crattr->cra_page = oap2cl_page(oap);
1926 crattr->cra_oa = oa;
1927 cl_req_attr_set(env, osc2cl(obj), crattr);
1929 if (cmd == OBD_BRW_WRITE)
1930 oa->o_grant_used = grant;
1932 sort_brw_pages(pga, page_count);
1933 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1935 CERROR("prep_req failed: %d\n", rc);
1939 req->rq_commit_cb = brw_commit;
1940 req->rq_interpret_reply = brw_interpret;
1941 req->rq_memalloc = mem_tight != 0;
1942 oap->oap_request = ptlrpc_request_addref(req);
1943 if (interrupted && !req->rq_intr)
1944 ptlrpc_mark_interrupted(req);
1946 /* Need to update the timestamps after the request is built in case
1947 * we race with setattr (locally or in queue at OST). If OST gets
1948 * later setattr before earlier BRW (as determined by the request xid),
1949 * the OST will not use BRW timestamps. Sadly, there is no obvious
1950 * way to do this in a single call. bug 10150 */
1951 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1952 crattr->cra_oa = &body->oa;
1953 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1954 cl_req_attr_set(env, osc2cl(obj), crattr);
1955 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1957 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1958 aa = ptlrpc_req_async_args(req);
1959 INIT_LIST_HEAD(&aa->aa_oaps);
1960 list_splice_init(&rpc_list, &aa->aa_oaps);
1961 INIT_LIST_HEAD(&aa->aa_exts);
1962 list_splice_init(ext_list, &aa->aa_exts);
1964 spin_lock(&cli->cl_loi_list_lock);
1965 starting_offset >>= PAGE_SHIFT;
1966 if (cmd == OBD_BRW_READ) {
1967 cli->cl_r_in_flight++;
1968 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1969 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1970 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1971 starting_offset + 1);
1973 cli->cl_w_in_flight++;
1974 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1975 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1976 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1977 starting_offset + 1);
1979 spin_unlock(&cli->cl_loi_list_lock);
1981 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1982 page_count, aa, cli->cl_r_in_flight,
1983 cli->cl_w_in_flight);
1984 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1986 ptlrpcd_add_req(req);
1992 cfs_memory_pressure_restore(mpflag);
1995 LASSERT(req == NULL);
2000 OBD_FREE(pga, sizeof(*pga) * page_count);
2001 /* this should happen rarely and is pretty bad, it makes the
2002 * pending list not follow the dirty order */
2003 while (!list_empty(ext_list)) {
2004 ext = list_entry(ext_list->next, struct osc_extent,
2006 list_del_init(&ext->oe_link);
2007 osc_extent_finish(env, ext, 0, rc);
2013 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2017 LASSERT(lock != NULL);
2019 lock_res_and_lock(lock);
2021 if (lock->l_ast_data == NULL)
2022 lock->l_ast_data = data;
2023 if (lock->l_ast_data == data)
2026 unlock_res_and_lock(lock);
2031 static int osc_enqueue_fini(struct ptlrpc_request *req,
2032 osc_enqueue_upcall_f upcall, void *cookie,
2033 struct lustre_handle *lockh, enum ldlm_mode mode,
2034 __u64 *flags, bool speculative, int errcode)
2036 bool intent = *flags & LDLM_FL_HAS_INTENT;
2040 /* The request was created before ldlm_cli_enqueue call. */
2041 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2042 struct ldlm_reply *rep;
2044 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2045 LASSERT(rep != NULL);
2047 rep->lock_policy_res1 =
2048 ptlrpc_status_ntoh(rep->lock_policy_res1);
2049 if (rep->lock_policy_res1)
2050 errcode = rep->lock_policy_res1;
2052 *flags |= LDLM_FL_LVB_READY;
2053 } else if (errcode == ELDLM_OK) {
2054 *flags |= LDLM_FL_LVB_READY;
2057 /* Call the update callback. */
2058 rc = (*upcall)(cookie, lockh, errcode);
2060 /* release the reference taken in ldlm_cli_enqueue() */
2061 if (errcode == ELDLM_LOCK_MATCHED)
2063 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2064 ldlm_lock_decref(lockh, mode);
2069 static int osc_enqueue_interpret(const struct lu_env *env,
2070 struct ptlrpc_request *req,
2071 struct osc_enqueue_args *aa, int rc)
2073 struct ldlm_lock *lock;
2074 struct lustre_handle *lockh = &aa->oa_lockh;
2075 enum ldlm_mode mode = aa->oa_mode;
2076 struct ost_lvb *lvb = aa->oa_lvb;
2077 __u32 lvb_len = sizeof(*lvb);
2082 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2084 lock = ldlm_handle2lock(lockh);
2085 LASSERTF(lock != NULL,
2086 "lockh %#llx, req %p, aa %p - client evicted?\n",
2087 lockh->cookie, req, aa);
2089 /* Take an additional reference so that a blocking AST that
2090 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2091 * to arrive after an upcall has been executed by
2092 * osc_enqueue_fini(). */
2093 ldlm_lock_addref(lockh, mode);
2095 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2096 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2098 /* Let CP AST to grant the lock first. */
2099 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2101 if (aa->oa_speculative) {
2102 LASSERT(aa->oa_lvb == NULL);
2103 LASSERT(aa->oa_flags == NULL);
2104 aa->oa_flags = &flags;
2107 /* Complete obtaining the lock procedure. */
2108 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2109 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2111 /* Complete osc stuff. */
2112 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2113 aa->oa_flags, aa->oa_speculative, rc);
2115 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2117 ldlm_lock_decref(lockh, mode);
2118 LDLM_LOCK_PUT(lock);
2122 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2124 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2125 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2126 * other synchronous requests, however keeping some locks and trying to obtain
2127 * others may take a considerable amount of time in a case of ost failure; and
2128 * when other sync requests do not get released lock from a client, the client
2129 * is evicted from the cluster -- such scenarious make the life difficult, so
2130 * release locks just after they are obtained. */
2131 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2132 __u64 *flags, union ldlm_policy_data *policy,
2133 struct ost_lvb *lvb, int kms_valid,
2134 osc_enqueue_upcall_f upcall, void *cookie,
2135 struct ldlm_enqueue_info *einfo,
2136 struct ptlrpc_request_set *rqset, int async,
2139 struct obd_device *obd = exp->exp_obd;
2140 struct lustre_handle lockh = { 0 };
2141 struct ptlrpc_request *req = NULL;
2142 int intent = *flags & LDLM_FL_HAS_INTENT;
2143 __u64 match_flags = *flags;
2144 enum ldlm_mode mode;
2148 /* Filesystem lock extents are extended to page boundaries so that
2149 * dealing with the page cache is a little smoother. */
2150 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2151 policy->l_extent.end |= ~PAGE_MASK;
2154 * kms is not valid when either object is completely fresh (so that no
2155 * locks are cached), or object was evicted. In the latter case cached
2156 * lock cannot be used, because it would prime inode state with
2157 * potentially stale LVB.
2162 /* Next, search for already existing extent locks that will cover us */
2163 /* If we're trying to read, we also search for an existing PW lock. The
2164 * VFS and page cache already protect us locally, so lots of readers/
2165 * writers can share a single PW lock.
2167 * There are problems with conversion deadlocks, so instead of
2168 * converting a read lock to a write lock, we'll just enqueue a new
2171 * At some point we should cancel the read lock instead of making them
2172 * send us a blocking callback, but there are problems with canceling
2173 * locks out from other users right now, too. */
2174 mode = einfo->ei_mode;
2175 if (einfo->ei_mode == LCK_PR)
2177 /* Normal lock requests must wait for the LVB to be ready before
2178 * matching a lock; speculative lock requests do not need to,
2179 * because they will not actually use the lock. */
2181 match_flags |= LDLM_FL_LVB_READY;
2183 match_flags |= LDLM_FL_BLOCK_GRANTED;
2184 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2185 einfo->ei_type, policy, mode, &lockh, 0);
2187 struct ldlm_lock *matched;
2189 if (*flags & LDLM_FL_TEST_LOCK)
2192 matched = ldlm_handle2lock(&lockh);
2194 /* This DLM lock request is speculative, and does not
2195 * have an associated IO request. Therefore if there
2196 * is already a DLM lock, it wll just inform the
2197 * caller to cancel the request for this stripe.*/
2198 lock_res_and_lock(matched);
2199 if (ldlm_extent_equal(&policy->l_extent,
2200 &matched->l_policy_data.l_extent))
2204 unlock_res_and_lock(matched);
2206 ldlm_lock_decref(&lockh, mode);
2207 LDLM_LOCK_PUT(matched);
2209 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2210 *flags |= LDLM_FL_LVB_READY;
2212 /* We already have a lock, and it's referenced. */
2213 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2215 ldlm_lock_decref(&lockh, mode);
2216 LDLM_LOCK_PUT(matched);
2219 ldlm_lock_decref(&lockh, mode);
2220 LDLM_LOCK_PUT(matched);
2225 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2229 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2230 &RQF_LDLM_ENQUEUE_LVB);
2234 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2236 ptlrpc_request_free(req);
2240 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2242 ptlrpc_request_set_replen(req);
2245 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2246 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2248 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2249 sizeof(*lvb), LVB_T_OST, &lockh, async);
2252 struct osc_enqueue_args *aa;
2253 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2254 aa = ptlrpc_req_async_args(req);
2256 aa->oa_mode = einfo->ei_mode;
2257 aa->oa_type = einfo->ei_type;
2258 lustre_handle_copy(&aa->oa_lockh, &lockh);
2259 aa->oa_upcall = upcall;
2260 aa->oa_cookie = cookie;
2261 aa->oa_speculative = speculative;
2263 aa->oa_flags = flags;
2266 /* speculative locks are essentially to enqueue
2267 * a DLM lock in advance, so we don't care
2268 * about the result of the enqueue. */
2270 aa->oa_flags = NULL;
2273 req->rq_interpret_reply =
2274 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2275 if (rqset == PTLRPCD_SET)
2276 ptlrpcd_add_req(req);
2278 ptlrpc_set_add_req(rqset, req);
2279 } else if (intent) {
2280 ptlrpc_req_finished(req);
2285 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2286 flags, speculative, rc);
2288 ptlrpc_req_finished(req);
2293 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2294 enum ldlm_type type, union ldlm_policy_data *policy,
2295 enum ldlm_mode mode, __u64 *flags, void *data,
2296 struct lustre_handle *lockh, int unref)
2298 struct obd_device *obd = exp->exp_obd;
2299 __u64 lflags = *flags;
2303 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2306 /* Filesystem lock extents are extended to page boundaries so that
2307 * dealing with the page cache is a little smoother */
2308 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2309 policy->l_extent.end |= ~PAGE_MASK;
2311 /* Next, search for already existing extent locks that will cover us */
2312 /* If we're trying to read, we also search for an existing PW lock. The
2313 * VFS and page cache already protect us locally, so lots of readers/
2314 * writers can share a single PW lock. */
2318 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2319 res_id, type, policy, rc, lockh, unref);
2320 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2324 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2326 LASSERT(lock != NULL);
2327 if (!osc_set_lock_data(lock, data)) {
2328 ldlm_lock_decref(lockh, rc);
2331 LDLM_LOCK_PUT(lock);
2336 static int osc_statfs_interpret(const struct lu_env *env,
2337 struct ptlrpc_request *req,
2338 struct osc_async_args *aa, int rc)
2340 struct obd_statfs *msfs;
2344 /* The request has in fact never been sent
2345 * due to issues at a higher level (LOV).
2346 * Exit immediately since the caller is
2347 * aware of the problem and takes care
2348 * of the clean up */
2351 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2352 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2358 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2360 GOTO(out, rc = -EPROTO);
2363 *aa->aa_oi->oi_osfs = *msfs;
2365 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2369 static int osc_statfs_async(struct obd_export *exp,
2370 struct obd_info *oinfo, __u64 max_age,
2371 struct ptlrpc_request_set *rqset)
2373 struct obd_device *obd = class_exp2obd(exp);
2374 struct ptlrpc_request *req;
2375 struct osc_async_args *aa;
2379 /* We could possibly pass max_age in the request (as an absolute
2380 * timestamp or a "seconds.usec ago") so the target can avoid doing
2381 * extra calls into the filesystem if that isn't necessary (e.g.
2382 * during mount that would help a bit). Having relative timestamps
2383 * is not so great if request processing is slow, while absolute
2384 * timestamps are not ideal because they need time synchronization. */
2385 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2389 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2391 ptlrpc_request_free(req);
2394 ptlrpc_request_set_replen(req);
2395 req->rq_request_portal = OST_CREATE_PORTAL;
2396 ptlrpc_at_set_req_timeout(req);
2398 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2399 /* procfs requests not want stat in wait for avoid deadlock */
2400 req->rq_no_resend = 1;
2401 req->rq_no_delay = 1;
2404 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2405 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2406 aa = ptlrpc_req_async_args(req);
2409 ptlrpc_set_add_req(rqset, req);
2413 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2414 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2416 struct obd_device *obd = class_exp2obd(exp);
2417 struct obd_statfs *msfs;
2418 struct ptlrpc_request *req;
2419 struct obd_import *imp = NULL;
2423 /*Since the request might also come from lprocfs, so we need
2424 *sync this with client_disconnect_export Bug15684*/
2425 down_read(&obd->u.cli.cl_sem);
2426 if (obd->u.cli.cl_import)
2427 imp = class_import_get(obd->u.cli.cl_import);
2428 up_read(&obd->u.cli.cl_sem);
2432 /* We could possibly pass max_age in the request (as an absolute
2433 * timestamp or a "seconds.usec ago") so the target can avoid doing
2434 * extra calls into the filesystem if that isn't necessary (e.g.
2435 * during mount that would help a bit). Having relative timestamps
2436 * is not so great if request processing is slow, while absolute
2437 * timestamps are not ideal because they need time synchronization. */
2438 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2440 class_import_put(imp);
2445 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2447 ptlrpc_request_free(req);
2450 ptlrpc_request_set_replen(req);
2451 req->rq_request_portal = OST_CREATE_PORTAL;
2452 ptlrpc_at_set_req_timeout(req);
2454 if (flags & OBD_STATFS_NODELAY) {
2455 /* procfs requests not want stat in wait for avoid deadlock */
2456 req->rq_no_resend = 1;
2457 req->rq_no_delay = 1;
2460 rc = ptlrpc_queue_wait(req);
2464 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2466 GOTO(out, rc = -EPROTO);
2473 ptlrpc_req_finished(req);
2477 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2478 void *karg, void __user *uarg)
2480 struct obd_device *obd = exp->exp_obd;
2481 struct obd_ioctl_data *data = karg;
2485 if (!try_module_get(THIS_MODULE)) {
2486 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2487 module_name(THIS_MODULE));
2491 case OBD_IOC_CLIENT_RECOVER:
2492 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2493 data->ioc_inlbuf1, 0);
2497 case IOC_OSC_SET_ACTIVE:
2498 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2501 case OBD_IOC_PING_TARGET:
2502 err = ptlrpc_obd_ping(obd);
2505 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2506 cmd, current_comm());
2507 GOTO(out, err = -ENOTTY);
2510 module_put(THIS_MODULE);
2514 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2515 u32 keylen, void *key, u32 vallen, void *val,
2516 struct ptlrpc_request_set *set)
2518 struct ptlrpc_request *req;
2519 struct obd_device *obd = exp->exp_obd;
2520 struct obd_import *imp = class_exp2cliimp(exp);
2525 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2527 if (KEY_IS(KEY_CHECKSUM)) {
2528 if (vallen != sizeof(int))
2530 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2534 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2535 sptlrpc_conf_client_adapt(obd);
2539 if (KEY_IS(KEY_FLUSH_CTX)) {
2540 sptlrpc_import_flush_my_ctx(imp);
2544 if (KEY_IS(KEY_CACHE_SET)) {
2545 struct client_obd *cli = &obd->u.cli;
2547 LASSERT(cli->cl_cache == NULL); /* only once */
2548 cli->cl_cache = (struct cl_client_cache *)val;
2549 cl_cache_incref(cli->cl_cache);
2550 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2552 /* add this osc into entity list */
2553 LASSERT(list_empty(&cli->cl_lru_osc));
2554 spin_lock(&cli->cl_cache->ccc_lru_lock);
2555 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2556 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2561 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2562 struct client_obd *cli = &obd->u.cli;
2563 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2564 long target = *(long *)val;
2566 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2571 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2574 /* We pass all other commands directly to OST. Since nobody calls osc
2575 methods directly and everybody is supposed to go through LOV, we
2576 assume lov checked invalid values for us.
2577 The only recognised values so far are evict_by_nid and mds_conn.
2578 Even if something bad goes through, we'd get a -EINVAL from OST
2581 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2582 &RQF_OST_SET_GRANT_INFO :
2587 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2588 RCL_CLIENT, keylen);
2589 if (!KEY_IS(KEY_GRANT_SHRINK))
2590 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2591 RCL_CLIENT, vallen);
2592 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2594 ptlrpc_request_free(req);
2598 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2599 memcpy(tmp, key, keylen);
2600 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2603 memcpy(tmp, val, vallen);
2605 if (KEY_IS(KEY_GRANT_SHRINK)) {
2606 struct osc_grant_args *aa;
2609 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2610 aa = ptlrpc_req_async_args(req);
2613 ptlrpc_req_finished(req);
2616 *oa = ((struct ost_body *)val)->oa;
2618 req->rq_interpret_reply = osc_shrink_grant_interpret;
2621 ptlrpc_request_set_replen(req);
2622 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2623 LASSERT(set != NULL);
2624 ptlrpc_set_add_req(set, req);
2625 ptlrpc_check_set(NULL, set);
2627 ptlrpcd_add_req(req);
2632 EXPORT_SYMBOL(osc_set_info_async);
2634 static int osc_reconnect(const struct lu_env *env,
2635 struct obd_export *exp, struct obd_device *obd,
2636 struct obd_uuid *cluuid,
2637 struct obd_connect_data *data,
2640 struct client_obd *cli = &obd->u.cli;
2642 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2646 spin_lock(&cli->cl_loi_list_lock);
2647 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2648 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2649 grant += cli->cl_dirty_grant;
2651 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2652 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2653 lost_grant = cli->cl_lost_grant;
2654 cli->cl_lost_grant = 0;
2655 spin_unlock(&cli->cl_loi_list_lock);
2657 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2658 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2659 data->ocd_version, data->ocd_grant, lost_grant);
2665 static int osc_disconnect(struct obd_export *exp)
2667 struct obd_device *obd = class_exp2obd(exp);
2670 rc = client_disconnect_export(exp);
2672 * Initially we put del_shrink_grant before disconnect_export, but it
2673 * causes the following problem if setup (connect) and cleanup
2674 * (disconnect) are tangled together.
2675 * connect p1 disconnect p2
2676 * ptlrpc_connect_import
2677 * ............... class_manual_cleanup
2680 * ptlrpc_connect_interrupt
2682 * add this client to shrink list
2684 * Bang! pinger trigger the shrink.
2685 * So the osc should be disconnected from the shrink list, after we
2686 * are sure the import has been destroyed. BUG18662
2688 if (obd->u.cli.cl_import == NULL)
2689 osc_del_shrink_grant(&obd->u.cli);
2693 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2694 struct hlist_node *hnode, void *arg)
2696 struct lu_env *env = arg;
2697 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2698 struct ldlm_lock *lock;
2699 struct osc_object *osc = NULL;
2703 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2704 if (lock->l_ast_data != NULL && osc == NULL) {
2705 osc = lock->l_ast_data;
2706 cl_object_get(osc2cl(osc));
2709 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2710 * by the 2nd round of ldlm_namespace_clean() call in
2711 * osc_import_event(). */
2712 ldlm_clear_cleaned(lock);
2717 osc_object_invalidate(env, osc);
2718 cl_object_put(env, osc2cl(osc));
2723 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2725 static int osc_import_event(struct obd_device *obd,
2726 struct obd_import *imp,
2727 enum obd_import_event event)
2729 struct client_obd *cli;
2733 LASSERT(imp->imp_obd == obd);
2736 case IMP_EVENT_DISCON: {
2738 spin_lock(&cli->cl_loi_list_lock);
2739 cli->cl_avail_grant = 0;
2740 cli->cl_lost_grant = 0;
2741 spin_unlock(&cli->cl_loi_list_lock);
2744 case IMP_EVENT_INACTIVE: {
2745 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2748 case IMP_EVENT_INVALIDATE: {
2749 struct ldlm_namespace *ns = obd->obd_namespace;
2753 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2755 env = cl_env_get(&refcheck);
2757 osc_io_unplug(env, &obd->u.cli, NULL);
2759 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2760 osc_ldlm_resource_invalidate,
2762 cl_env_put(env, &refcheck);
2764 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2769 case IMP_EVENT_ACTIVE: {
2770 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2773 case IMP_EVENT_OCD: {
2774 struct obd_connect_data *ocd = &imp->imp_connect_data;
2776 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2777 osc_init_grant(&obd->u.cli, ocd);
2780 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2781 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2783 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2786 case IMP_EVENT_DEACTIVATE: {
2787 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2790 case IMP_EVENT_ACTIVATE: {
2791 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2795 CERROR("Unknown import event %d\n", event);
2802 * Determine whether the lock can be canceled before replaying the lock
2803 * during recovery, see bug16774 for detailed information.
2805 * \retval zero the lock can't be canceled
2806 * \retval other ok to cancel
2808 static int osc_cancel_weight(struct ldlm_lock *lock)
2811 * Cancel all unused and granted extent lock.
2813 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2814 lock->l_granted_mode == lock->l_req_mode &&
2815 osc_ldlm_weigh_ast(lock) == 0)
2821 static int brw_queue_work(const struct lu_env *env, void *data)
2823 struct client_obd *cli = data;
2825 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2827 osc_io_unplug(env, cli, NULL);
2831 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2833 struct client_obd *cli = &obd->u.cli;
2839 rc = ptlrpcd_addref();
2843 rc = client_obd_setup(obd, lcfg);
2845 GOTO(out_ptlrpcd, rc);
2848 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2849 if (IS_ERR(handler))
2850 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2851 cli->cl_writeback_work = handler;
2853 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2854 if (IS_ERR(handler))
2855 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2856 cli->cl_lru_work = handler;
2858 rc = osc_quota_setup(obd);
2860 GOTO(out_ptlrpcd_work, rc);
2862 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2864 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2868 if (cli->cl_writeback_work != NULL) {
2869 ptlrpcd_destroy_work(cli->cl_writeback_work);
2870 cli->cl_writeback_work = NULL;
2872 if (cli->cl_lru_work != NULL) {
2873 ptlrpcd_destroy_work(cli->cl_lru_work);
2874 cli->cl_lru_work = NULL;
2876 client_obd_cleanup(obd);
2881 EXPORT_SYMBOL(osc_setup_common);
2883 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2885 struct client_obd *cli = &obd->u.cli;
2886 struct obd_type *type;
2894 rc = osc_setup_common(obd, lcfg);
2898 #ifdef CONFIG_PROC_FS
2899 obd->obd_vars = lprocfs_osc_obd_vars;
2901 /* If this is true then both client (osc) and server (osp) are on the
2902 * same node. The osp layer if loaded first will register the osc proc
2903 * directory. In that case this obd_device will be attached its proc
2904 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2906 type = class_search_type(LUSTRE_OSP_NAME);
2907 if (type && type->typ_procsym) {
2908 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2910 obd->obd_vars, obd);
2911 if (IS_ERR(obd->obd_proc_entry)) {
2912 rc = PTR_ERR(obd->obd_proc_entry);
2913 CERROR("error %d setting up lprocfs for %s\n", rc,
2915 obd->obd_proc_entry = NULL;
2919 rc = lprocfs_obd_setup(obd, false);
2921 /* If the basic OSC proc tree construction succeeded then
2924 lproc_osc_attach_seqstat(obd);
2925 sptlrpc_lprocfs_cliobd_attach(obd);
2926 ptlrpc_lprocfs_register_obd(obd);
2930 * We try to control the total number of requests with a upper limit
2931 * osc_reqpool_maxreqcount. There might be some race which will cause
2932 * over-limit allocation, but it is fine.
2934 req_count = atomic_read(&osc_pool_req_count);
2935 if (req_count < osc_reqpool_maxreqcount) {
2936 adding = cli->cl_max_rpcs_in_flight + 2;
2937 if (req_count + adding > osc_reqpool_maxreqcount)
2938 adding = osc_reqpool_maxreqcount - req_count;
2940 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2941 atomic_add(added, &osc_pool_req_count);
2944 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2945 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2947 spin_lock(&osc_shrink_lock);
2948 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2949 spin_unlock(&osc_shrink_lock);
2954 int osc_precleanup_common(struct obd_device *obd)
2956 struct client_obd *cli = &obd->u.cli;
2960 * for echo client, export may be on zombie list, wait for
2961 * zombie thread to cull it, because cli.cl_import will be
2962 * cleared in client_disconnect_export():
2963 * class_export_destroy() -> obd_cleanup() ->
2964 * echo_device_free() -> echo_client_cleanup() ->
2965 * obd_disconnect() -> osc_disconnect() ->
2966 * client_disconnect_export()
2968 obd_zombie_barrier();
2969 if (cli->cl_writeback_work) {
2970 ptlrpcd_destroy_work(cli->cl_writeback_work);
2971 cli->cl_writeback_work = NULL;
2974 if (cli->cl_lru_work) {
2975 ptlrpcd_destroy_work(cli->cl_lru_work);
2976 cli->cl_lru_work = NULL;
2979 obd_cleanup_client_import(obd);
2982 EXPORT_SYMBOL(osc_precleanup_common);
2984 static int osc_precleanup(struct obd_device *obd)
2988 osc_precleanup_common(obd);
2990 ptlrpc_lprocfs_unregister_obd(obd);
2991 lprocfs_obd_cleanup(obd);
2995 int osc_cleanup_common(struct obd_device *obd)
2997 struct client_obd *cli = &obd->u.cli;
3002 spin_lock(&osc_shrink_lock);
3003 list_del(&cli->cl_shrink_list);
3004 spin_unlock(&osc_shrink_lock);
3007 if (cli->cl_cache != NULL) {
3008 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3009 spin_lock(&cli->cl_cache->ccc_lru_lock);
3010 list_del_init(&cli->cl_lru_osc);
3011 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3012 cli->cl_lru_left = NULL;
3013 cl_cache_decref(cli->cl_cache);
3014 cli->cl_cache = NULL;
3017 /* free memory of osc quota cache */
3018 osc_quota_cleanup(obd);
3020 rc = client_obd_cleanup(obd);
3025 EXPORT_SYMBOL(osc_cleanup_common);
3027 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3029 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3030 return rc > 0 ? 0: rc;
3033 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3035 return osc_process_config_base(obd, buf);
3038 static struct obd_ops osc_obd_ops = {
3039 .o_owner = THIS_MODULE,
3040 .o_setup = osc_setup,
3041 .o_precleanup = osc_precleanup,
3042 .o_cleanup = osc_cleanup_common,
3043 .o_add_conn = client_import_add_conn,
3044 .o_del_conn = client_import_del_conn,
3045 .o_connect = client_connect_import,
3046 .o_reconnect = osc_reconnect,
3047 .o_disconnect = osc_disconnect,
3048 .o_statfs = osc_statfs,
3049 .o_statfs_async = osc_statfs_async,
3050 .o_create = osc_create,
3051 .o_destroy = osc_destroy,
3052 .o_getattr = osc_getattr,
3053 .o_setattr = osc_setattr,
3054 .o_iocontrol = osc_iocontrol,
3055 .o_set_info_async = osc_set_info_async,
3056 .o_import_event = osc_import_event,
3057 .o_process_config = osc_process_config,
3058 .o_quotactl = osc_quotactl,
3061 static struct shrinker *osc_cache_shrinker;
3062 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3063 DEFINE_SPINLOCK(osc_shrink_lock);
3065 #ifndef HAVE_SHRINKER_COUNT
3066 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3068 struct shrink_control scv = {
3069 .nr_to_scan = shrink_param(sc, nr_to_scan),
3070 .gfp_mask = shrink_param(sc, gfp_mask)
3072 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3073 struct shrinker *shrinker = NULL;
3076 (void)osc_cache_shrink_scan(shrinker, &scv);
3078 return osc_cache_shrink_count(shrinker, &scv);
3082 static int __init osc_init(void)
3084 bool enable_proc = true;
3085 struct obd_type *type;
3086 unsigned int reqpool_size;
3087 unsigned int reqsize;
3089 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3090 osc_cache_shrink_count, osc_cache_shrink_scan);
3093 /* print an address of _any_ initialized kernel symbol from this
3094 * module, to allow debugging with gdb that doesn't support data
3095 * symbols from modules.*/
3096 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3098 rc = lu_kmem_init(osc_caches);
3102 type = class_search_type(LUSTRE_OSP_NAME);
3103 if (type != NULL && type->typ_procsym != NULL)
3104 enable_proc = false;
3106 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3107 LUSTRE_OSC_NAME, &osc_device_type);
3111 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3113 /* This is obviously too much memory, only prevent overflow here */
3114 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3115 GOTO(out_type, rc = -EINVAL);
3117 reqpool_size = osc_reqpool_mem_max << 20;
3120 while (reqsize < OST_IO_MAXREQSIZE)
3121 reqsize = reqsize << 1;
3124 * We don't enlarge the request count in OSC pool according to
3125 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3126 * tried after normal allocation failed. So a small OSC pool won't
3127 * cause much performance degression in most of cases.
3129 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3131 atomic_set(&osc_pool_req_count, 0);
3132 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3133 ptlrpc_add_rqs_to_pool);
3135 if (osc_rq_pool != NULL)
3139 class_unregister_type(LUSTRE_OSC_NAME);
3141 lu_kmem_fini(osc_caches);
3146 static void __exit osc_exit(void)
3148 remove_shrinker(osc_cache_shrinker);
3149 class_unregister_type(LUSTRE_OSC_NAME);
3150 lu_kmem_fini(osc_caches);
3151 ptlrpc_free_rq_pool(osc_rq_pool);
3154 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3155 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3156 MODULE_VERSION(LUSTRE_VERSION_STRING);
3157 MODULE_LICENSE("GPL");
3159 module_init(osc_init);
3160 module_exit(osc_exit);