4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <libcfs/libcfs.h>
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
51 #include "osc_internal.h"
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
61 struct osc_brw_async_args {
67 struct brw_page **aa_ppga;
68 struct client_obd *aa_cli;
69 struct list_head aa_oaps;
70 struct list_head aa_exts;
73 #define osc_grant_args osc_brw_async_args
75 struct osc_setattr_args {
77 obd_enqueue_update_f sa_upcall;
81 struct osc_fsync_args {
82 struct osc_object *fa_obj;
84 obd_enqueue_update_f fa_upcall;
88 struct osc_ladvise_args {
90 obd_enqueue_update_f la_upcall;
94 struct osc_enqueue_args {
95 struct obd_export *oa_exp;
96 enum ldlm_type oa_type;
97 enum ldlm_mode oa_mode;
99 osc_enqueue_upcall_f oa_upcall;
101 struct ost_lvb *oa_lvb;
102 struct lustre_handle oa_lockh;
103 unsigned int oa_agl:1;
106 static void osc_release_ppga(struct brw_page **ppga, size_t count);
107 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
110 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
112 struct ost_body *body;
114 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
117 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
120 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
123 struct ptlrpc_request *req;
124 struct ost_body *body;
128 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
132 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
134 ptlrpc_request_free(req);
138 osc_pack_req_body(req, oa);
140 ptlrpc_request_set_replen(req);
142 rc = ptlrpc_queue_wait(req);
146 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
148 GOTO(out, rc = -EPROTO);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
153 oa->o_blksize = cli_brw_size(exp->exp_obd);
154 oa->o_valid |= OBD_MD_FLBLKSZ;
158 ptlrpc_req_finished(req);
163 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
166 struct ptlrpc_request *req;
167 struct ost_body *body;
171 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
173 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
177 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
179 ptlrpc_request_free(req);
183 osc_pack_req_body(req, oa);
185 ptlrpc_request_set_replen(req);
187 rc = ptlrpc_queue_wait(req);
191 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193 GOTO(out, rc = -EPROTO);
195 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
199 ptlrpc_req_finished(req);
204 static int osc_setattr_interpret(const struct lu_env *env,
205 struct ptlrpc_request *req,
206 struct osc_setattr_args *sa, int rc)
208 struct ost_body *body;
214 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216 GOTO(out, rc = -EPROTO);
218 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
221 rc = sa->sa_upcall(sa->sa_cookie, rc);
225 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
226 obd_enqueue_update_f upcall, void *cookie,
227 struct ptlrpc_request_set *rqset)
229 struct ptlrpc_request *req;
230 struct osc_setattr_args *sa;
235 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
239 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
241 ptlrpc_request_free(req);
245 osc_pack_req_body(req, oa);
247 ptlrpc_request_set_replen(req);
249 /* do mds to ost setattr asynchronously */
251 /* Do not wait for response. */
252 ptlrpcd_add_req(req);
254 req->rq_interpret_reply =
255 (ptlrpc_interpterer_t)osc_setattr_interpret;
257 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
258 sa = ptlrpc_req_async_args(req);
260 sa->sa_upcall = upcall;
261 sa->sa_cookie = cookie;
263 if (rqset == PTLRPCD_SET)
264 ptlrpcd_add_req(req);
266 ptlrpc_set_add_req(rqset, req);
272 static int osc_ladvise_interpret(const struct lu_env *env,
273 struct ptlrpc_request *req,
276 struct osc_ladvise_args *la = arg;
277 struct ost_body *body;
283 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
285 GOTO(out, rc = -EPROTO);
287 *la->la_oa = body->oa;
289 rc = la->la_upcall(la->la_cookie, rc);
294 * If rqset is NULL, do not wait for response. Upcall and cookie could also
295 * be NULL in this case
297 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
298 struct ladvise_hdr *ladvise_hdr,
299 obd_enqueue_update_f upcall, void *cookie,
300 struct ptlrpc_request_set *rqset)
302 struct ptlrpc_request *req;
303 struct ost_body *body;
304 struct osc_ladvise_args *la;
306 struct lu_ladvise *req_ladvise;
307 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
308 int num_advise = ladvise_hdr->lah_count;
309 struct ladvise_hdr *req_ladvise_hdr;
312 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
316 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
317 num_advise * sizeof(*ladvise));
318 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
320 ptlrpc_request_free(req);
323 req->rq_request_portal = OST_IO_PORTAL;
324 ptlrpc_at_set_req_timeout(req);
326 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
328 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
331 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
332 &RMF_OST_LADVISE_HDR);
333 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
335 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
336 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
337 ptlrpc_request_set_replen(req);
340 /* Do not wait for response. */
341 ptlrpcd_add_req(req);
345 req->rq_interpret_reply = osc_ladvise_interpret;
346 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
347 la = ptlrpc_req_async_args(req);
349 la->la_upcall = upcall;
350 la->la_cookie = cookie;
352 if (rqset == PTLRPCD_SET)
353 ptlrpcd_add_req(req);
355 ptlrpc_set_add_req(rqset, req);
360 static int osc_create(const struct lu_env *env, struct obd_export *exp,
363 struct ptlrpc_request *req;
364 struct ost_body *body;
369 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
370 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
372 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
374 GOTO(out, rc = -ENOMEM);
376 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
378 ptlrpc_request_free(req);
382 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
385 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
387 ptlrpc_request_set_replen(req);
389 rc = ptlrpc_queue_wait(req);
393 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
395 GOTO(out_req, rc = -EPROTO);
397 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
398 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
400 oa->o_blksize = cli_brw_size(exp->exp_obd);
401 oa->o_valid |= OBD_MD_FLBLKSZ;
403 CDEBUG(D_HA, "transno: %lld\n",
404 lustre_msg_get_transno(req->rq_repmsg));
406 ptlrpc_req_finished(req);
411 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
412 obd_enqueue_update_f upcall, void *cookie,
413 struct ptlrpc_request_set *rqset)
415 struct ptlrpc_request *req;
416 struct osc_setattr_args *sa;
417 struct ost_body *body;
421 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
425 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
427 ptlrpc_request_free(req);
430 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
431 ptlrpc_at_set_req_timeout(req);
433 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
435 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
437 ptlrpc_request_set_replen(req);
439 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
440 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
441 sa = ptlrpc_req_async_args(req);
443 sa->sa_upcall = upcall;
444 sa->sa_cookie = cookie;
445 if (rqset == PTLRPCD_SET)
446 ptlrpcd_add_req(req);
448 ptlrpc_set_add_req(rqset, req);
453 static int osc_sync_interpret(const struct lu_env *env,
454 struct ptlrpc_request *req,
457 struct osc_fsync_args *fa = arg;
458 struct ost_body *body;
459 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
460 unsigned long valid = 0;
461 struct cl_object *obj;
467 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
469 CERROR("can't unpack ost_body\n");
470 GOTO(out, rc = -EPROTO);
473 *fa->fa_oa = body->oa;
474 obj = osc2cl(fa->fa_obj);
476 /* Update osc object's blocks attribute */
477 cl_object_attr_lock(obj);
478 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
479 attr->cat_blocks = body->oa.o_blocks;
484 cl_object_attr_update(env, obj, attr, valid);
485 cl_object_attr_unlock(obj);
488 rc = fa->fa_upcall(fa->fa_cookie, rc);
492 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
493 obd_enqueue_update_f upcall, void *cookie,
494 struct ptlrpc_request_set *rqset)
496 struct obd_export *exp = osc_export(obj);
497 struct ptlrpc_request *req;
498 struct ost_body *body;
499 struct osc_fsync_args *fa;
503 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
507 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
509 ptlrpc_request_free(req);
513 /* overload the size and blocks fields in the oa with start/end */
514 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
516 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
518 ptlrpc_request_set_replen(req);
519 req->rq_interpret_reply = osc_sync_interpret;
521 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
522 fa = ptlrpc_req_async_args(req);
525 fa->fa_upcall = upcall;
526 fa->fa_cookie = cookie;
528 if (rqset == PTLRPCD_SET)
529 ptlrpcd_add_req(req);
531 ptlrpc_set_add_req(rqset, req);
536 /* Find and cancel locally locks matched by @mode in the resource found by
537 * @objid. Found locks are added into @cancel list. Returns the amount of
538 * locks added to @cancels list. */
539 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
540 struct list_head *cancels,
541 enum ldlm_mode mode, __u64 lock_flags)
543 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
544 struct ldlm_res_id res_id;
545 struct ldlm_resource *res;
549 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
550 * export) but disabled through procfs (flag in NS).
552 * This distinguishes from a case when ELC is not supported originally,
553 * when we still want to cancel locks in advance and just cancel them
554 * locally, without sending any RPC. */
555 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
558 ostid_build_res_name(&oa->o_oi, &res_id);
559 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
563 LDLM_RESOURCE_ADDREF(res);
564 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565 lock_flags, 0, NULL);
566 LDLM_RESOURCE_DELREF(res);
567 ldlm_resource_putref(res);
571 static int osc_destroy_interpret(const struct lu_env *env,
572 struct ptlrpc_request *req, void *data,
575 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
577 atomic_dec(&cli->cl_destroy_in_flight);
578 wake_up(&cli->cl_destroy_waitq);
582 static int osc_can_send_destroy(struct client_obd *cli)
584 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
585 cli->cl_max_rpcs_in_flight) {
586 /* The destroy request can be sent */
589 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
590 cli->cl_max_rpcs_in_flight) {
592 * The counter has been modified between the two atomic
595 wake_up(&cli->cl_destroy_waitq);
600 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
603 struct client_obd *cli = &exp->exp_obd->u.cli;
604 struct ptlrpc_request *req;
605 struct ost_body *body;
606 struct list_head cancels = LIST_HEAD_INIT(cancels);
611 CDEBUG(D_INFO, "oa NULL\n");
615 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
616 LDLM_FL_DISCARD_DATA);
618 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
620 ldlm_lock_list_put(&cancels, l_bl_ast, count);
624 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
627 ptlrpc_request_free(req);
631 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
632 ptlrpc_at_set_req_timeout(req);
634 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
636 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
638 ptlrpc_request_set_replen(req);
640 req->rq_interpret_reply = osc_destroy_interpret;
641 if (!osc_can_send_destroy(cli)) {
642 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
645 * Wait until the number of on-going destroy RPCs drops
646 * under max_rpc_in_flight
648 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
649 osc_can_send_destroy(cli), &lwi);
651 ptlrpc_req_finished(req);
656 /* Do not wait for response */
657 ptlrpcd_add_req(req);
661 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
666 LASSERT(!(oa->o_valid & bits));
669 spin_lock(&cli->cl_loi_list_lock);
670 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
671 oa->o_dirty = cli->cl_dirty_grant;
673 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
674 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
675 cli->cl_dirty_max_pages)) {
676 CERROR("dirty %lu - %lu > dirty_max %lu\n",
677 cli->cl_dirty_pages, cli->cl_dirty_transit,
678 cli->cl_dirty_max_pages);
680 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
681 atomic_long_read(&obd_dirty_transit_pages) >
682 (long)(obd_max_dirty_pages + 1))) {
683 /* The atomic_read() allowing the atomic_inc() are
684 * not covered by a lock thus they may safely race and trip
685 * this CERROR() unless we add in a small fudge factor (+1). */
686 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
687 cli_name(cli), atomic_long_read(&obd_dirty_pages),
688 atomic_long_read(&obd_dirty_transit_pages),
689 obd_max_dirty_pages);
691 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
693 CERROR("dirty %lu - dirty_max %lu too big???\n",
694 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697 unsigned long nrpages;
699 nrpages = cli->cl_max_pages_per_rpc;
700 nrpages *= cli->cl_max_rpcs_in_flight + 1;
701 nrpages = max(nrpages, cli->cl_dirty_max_pages);
702 oa->o_undirty = nrpages << PAGE_SHIFT;
703 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
707 /* take extent tax into account when asking for more
709 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
710 cli->cl_max_extent_pages;
711 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
715 oa->o_dropped = cli->cl_lost_grant;
716 cli->cl_lost_grant = 0;
717 spin_unlock(&cli->cl_loi_list_lock);
718 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
719 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 void osc_update_next_shrink(struct client_obd *cli)
724 cli->cl_next_shrink_grant =
725 cfs_time_shift(cli->cl_grant_shrink_interval);
726 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
727 cli->cl_next_shrink_grant);
730 static void __osc_update_grant(struct client_obd *cli, u64 grant)
732 spin_lock(&cli->cl_loi_list_lock);
733 cli->cl_avail_grant += grant;
734 spin_unlock(&cli->cl_loi_list_lock);
737 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
739 if (body->oa.o_valid & OBD_MD_FLGRANT) {
740 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
741 __osc_update_grant(cli, body->oa.o_grant);
745 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
746 u32 keylen, void *key,
747 u32 vallen, void *val,
748 struct ptlrpc_request_set *set);
750 static int osc_shrink_grant_interpret(const struct lu_env *env,
751 struct ptlrpc_request *req,
754 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
755 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
756 struct ost_body *body;
759 __osc_update_grant(cli, oa->o_grant);
763 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
765 osc_update_grant(cli, body);
771 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
773 spin_lock(&cli->cl_loi_list_lock);
774 oa->o_grant = cli->cl_avail_grant / 4;
775 cli->cl_avail_grant -= oa->o_grant;
776 spin_unlock(&cli->cl_loi_list_lock);
777 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
778 oa->o_valid |= OBD_MD_FLFLAGS;
781 oa->o_flags |= OBD_FL_SHRINK_GRANT;
782 osc_update_next_shrink(cli);
785 /* Shrink the current grant, either from some large amount to enough for a
786 * full set of in-flight RPCs, or if we have already shrunk to that limit
787 * then to enough for a single RPC. This avoids keeping more grant than
788 * needed, and avoids shrinking the grant piecemeal. */
789 static int osc_shrink_grant(struct client_obd *cli)
791 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
792 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
794 spin_lock(&cli->cl_loi_list_lock);
795 if (cli->cl_avail_grant <= target_bytes)
796 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
797 spin_unlock(&cli->cl_loi_list_lock);
799 return osc_shrink_grant_to_target(cli, target_bytes);
802 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
805 struct ost_body *body;
808 spin_lock(&cli->cl_loi_list_lock);
809 /* Don't shrink if we are already above or below the desired limit
810 * We don't want to shrink below a single RPC, as that will negatively
811 * impact block allocation and long-term performance. */
812 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
813 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
815 if (target_bytes >= cli->cl_avail_grant) {
816 spin_unlock(&cli->cl_loi_list_lock);
819 spin_unlock(&cli->cl_loi_list_lock);
825 osc_announce_cached(cli, &body->oa, 0);
827 spin_lock(&cli->cl_loi_list_lock);
828 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
829 cli->cl_avail_grant = target_bytes;
830 spin_unlock(&cli->cl_loi_list_lock);
831 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
832 body->oa.o_valid |= OBD_MD_FLFLAGS;
833 body->oa.o_flags = 0;
835 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
836 osc_update_next_shrink(cli);
838 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
839 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
840 sizeof(*body), body, NULL);
842 __osc_update_grant(cli, body->oa.o_grant);
847 static int osc_should_shrink_grant(struct client_obd *client)
849 cfs_time_t time = cfs_time_current();
850 cfs_time_t next_shrink = client->cl_next_shrink_grant;
852 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
853 OBD_CONNECT_GRANT_SHRINK) == 0)
856 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
857 /* Get the current RPC size directly, instead of going via:
858 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
859 * Keep comment here so that it can be found by searching. */
860 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
862 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
863 client->cl_avail_grant > brw_size)
866 osc_update_next_shrink(client);
871 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
873 struct client_obd *client;
875 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
876 if (osc_should_shrink_grant(client))
877 osc_shrink_grant(client);
882 static int osc_add_shrink_grant(struct client_obd *client)
886 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
888 osc_grant_shrink_grant_cb, NULL,
889 &client->cl_grant_shrink_list);
891 CERROR("add grant client %s error %d\n", cli_name(client), rc);
894 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
895 osc_update_next_shrink(client);
899 static int osc_del_shrink_grant(struct client_obd *client)
901 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
905 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
908 * ocd_grant is the total grant amount we're expect to hold: if we've
909 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
910 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
913 * race is tolerable here: if we're evicted, but imp_state already
914 * left EVICTED state, then cl_dirty_pages must be 0 already.
916 spin_lock(&cli->cl_loi_list_lock);
917 cli->cl_avail_grant = ocd->ocd_grant;
918 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
919 cli->cl_avail_grant -= cli->cl_reserved_grant;
920 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
921 cli->cl_avail_grant -= cli->cl_dirty_grant;
923 cli->cl_avail_grant -=
924 cli->cl_dirty_pages << PAGE_SHIFT;
927 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
931 /* overhead for each extent insertion */
932 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
933 /* determine the appropriate chunk size used by osc_extent. */
934 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
935 ocd->ocd_grant_blkbits);
936 /* max_pages_per_rpc must be chunk aligned */
937 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
938 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
939 ~chunk_mask) & chunk_mask;
940 /* determine maximum extent size, in #pages */
941 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
942 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
943 if (cli->cl_max_extent_pages == 0)
944 cli->cl_max_extent_pages = 1;
946 cli->cl_grant_extent_tax = 0;
947 cli->cl_chunkbits = PAGE_SHIFT;
948 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
950 spin_unlock(&cli->cl_loi_list_lock);
952 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
953 "chunk bits: %d cl_max_extent_pages: %d\n",
955 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
956 cli->cl_max_extent_pages);
958 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
959 list_empty(&cli->cl_grant_shrink_list))
960 osc_add_shrink_grant(cli);
963 /* We assume that the reason this OSC got a short read is because it read
964 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
965 * via the LOV, and it _knows_ it's reading inside the file, it's just that
966 * this stripe never got written at or beyond this stripe offset yet. */
967 static void handle_short_read(int nob_read, size_t page_count,
968 struct brw_page **pga)
973 /* skip bytes read OK */
974 while (nob_read > 0) {
975 LASSERT (page_count > 0);
977 if (pga[i]->count > nob_read) {
978 /* EOF inside this page */
979 ptr = kmap(pga[i]->pg) +
980 (pga[i]->off & ~PAGE_MASK);
981 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
988 nob_read -= pga[i]->count;
993 /* zero remaining pages */
994 while (page_count-- > 0) {
995 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
996 memset(ptr, 0, pga[i]->count);
1002 static int check_write_rcs(struct ptlrpc_request *req,
1003 int requested_nob, int niocount,
1004 size_t page_count, struct brw_page **pga)
1009 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1010 sizeof(*remote_rcs) *
1012 if (remote_rcs == NULL) {
1013 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1017 /* return error if any niobuf was in error */
1018 for (i = 0; i < niocount; i++) {
1019 if ((int)remote_rcs[i] < 0)
1020 return(remote_rcs[i]);
1022 if (remote_rcs[i] != 0) {
1023 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1024 i, remote_rcs[i], req);
1029 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1030 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1031 req->rq_bulk->bd_nob_transferred, requested_nob);
1038 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1040 if (p1->flag != p2->flag) {
1041 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1042 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1043 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1045 /* warn if we try to combine flags that we don't know to be
1046 * safe to combine */
1047 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1048 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1049 "report this at https://jira.hpdd.intel.com/\n",
1050 p1->flag, p2->flag);
1055 return (p1->off + p1->count == p2->off);
1058 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1059 struct brw_page **pga, int opc,
1060 enum cksum_types cksum_type)
1064 struct cfs_crypto_hash_desc *hdesc;
1065 unsigned int bufsize;
1066 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1068 LASSERT(pg_count > 0);
1070 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1071 if (IS_ERR(hdesc)) {
1072 CERROR("Unable to initialize checksum hash %s\n",
1073 cfs_crypto_hash_name(cfs_alg));
1074 return PTR_ERR(hdesc);
1077 while (nob > 0 && pg_count > 0) {
1078 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1080 /* corrupt the data before we compute the checksum, to
1081 * simulate an OST->client data error */
1082 if (i == 0 && opc == OST_READ &&
1083 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1084 unsigned char *ptr = kmap(pga[i]->pg);
1085 int off = pga[i]->off & ~PAGE_MASK;
1087 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1090 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1091 pga[i]->off & ~PAGE_MASK,
1093 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1094 (int)(pga[i]->off & ~PAGE_MASK));
1096 nob -= pga[i]->count;
1101 bufsize = sizeof(cksum);
1102 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1104 /* For sending we only compute the wrong checksum instead
1105 * of corrupting the data so it is still correct on a redo */
1106 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1113 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1114 u32 page_count, struct brw_page **pga,
1115 struct ptlrpc_request **reqp, int resend)
1117 struct ptlrpc_request *req;
1118 struct ptlrpc_bulk_desc *desc;
1119 struct ost_body *body;
1120 struct obd_ioobj *ioobj;
1121 struct niobuf_remote *niobuf;
1122 int niocount, i, requested_nob, opc, rc;
1123 struct osc_brw_async_args *aa;
1124 struct req_capsule *pill;
1125 struct brw_page *pg_prev;
1128 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1129 RETURN(-ENOMEM); /* Recoverable */
1130 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1131 RETURN(-EINVAL); /* Fatal */
1133 if ((cmd & OBD_BRW_WRITE) != 0) {
1135 req = ptlrpc_request_alloc_pool(cli->cl_import,
1137 &RQF_OST_BRW_WRITE);
1140 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1145 for (niocount = i = 1; i < page_count; i++) {
1146 if (!can_merge_pages(pga[i - 1], pga[i]))
1150 pill = &req->rq_pill;
1151 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1153 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1154 niocount * sizeof(*niobuf));
1156 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1158 ptlrpc_request_free(req);
1161 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1162 ptlrpc_at_set_req_timeout(req);
1163 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1165 req->rq_no_retry_einprogress = 1;
1167 desc = ptlrpc_prep_bulk_imp(req, page_count,
1168 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1169 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1170 PTLRPC_BULK_PUT_SINK) |
1171 PTLRPC_BULK_BUF_KIOV,
1173 &ptlrpc_bulk_kiov_pin_ops);
1176 GOTO(out, rc = -ENOMEM);
1177 /* NB request now owns desc and will free it when it gets freed */
1179 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1180 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1181 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1182 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1184 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1186 obdo_to_ioobj(oa, ioobj);
1187 ioobj->ioo_bufcnt = niocount;
1188 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1189 * that might be send for this request. The actual number is decided
1190 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1191 * "max - 1" for old client compatibility sending "0", and also so the
1192 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1193 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1194 LASSERT(page_count > 0);
1196 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1197 struct brw_page *pg = pga[i];
1198 int poff = pg->off & ~PAGE_MASK;
1200 LASSERT(pg->count > 0);
1201 /* make sure there is no gap in the middle of page array */
1202 LASSERTF(page_count == 1 ||
1203 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1204 ergo(i > 0 && i < page_count - 1,
1205 poff == 0 && pg->count == PAGE_SIZE) &&
1206 ergo(i == page_count - 1, poff == 0)),
1207 "i: %d/%d pg: %p off: %llu, count: %u\n",
1208 i, page_count, pg, pg->off, pg->count);
1209 LASSERTF(i == 0 || pg->off > pg_prev->off,
1210 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1211 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1213 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1214 pg_prev->pg, page_private(pg_prev->pg),
1215 pg_prev->pg->index, pg_prev->off);
1216 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1217 (pg->flag & OBD_BRW_SRVLOCK));
1219 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1220 requested_nob += pg->count;
1222 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1224 niobuf->rnb_len += pg->count;
1226 niobuf->rnb_offset = pg->off;
1227 niobuf->rnb_len = pg->count;
1228 niobuf->rnb_flags = pg->flag;
1233 LASSERTF((void *)(niobuf - niocount) ==
1234 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1235 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1236 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1238 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1240 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1241 body->oa.o_valid |= OBD_MD_FLFLAGS;
1242 body->oa.o_flags = 0;
1244 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1247 if (osc_should_shrink_grant(cli))
1248 osc_shrink_grant_local(cli, &body->oa);
1250 /* size[REQ_REC_OFF] still sizeof (*body) */
1251 if (opc == OST_WRITE) {
1252 if (cli->cl_checksum &&
1253 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1254 /* store cl_cksum_type in a local variable since
1255 * it can be changed via lprocfs */
1256 enum cksum_types cksum_type = cli->cl_cksum_type;
1258 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1259 body->oa.o_flags = 0;
1261 body->oa.o_flags |= cksum_type_pack(cksum_type);
1262 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1263 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1267 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1269 /* save this in 'oa', too, for later checking */
1270 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271 oa->o_flags |= cksum_type_pack(cksum_type);
1273 /* clear out the checksum flag, in case this is a
1274 * resend but cl_checksum is no longer set. b=11238 */
1275 oa->o_valid &= ~OBD_MD_FLCKSUM;
1277 oa->o_cksum = body->oa.o_cksum;
1278 /* 1 RC per niobuf */
1279 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1280 sizeof(__u32) * niocount);
1282 if (cli->cl_checksum &&
1283 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1284 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1285 body->oa.o_flags = 0;
1286 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1287 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1290 /* Client cksum has been already copied to wire obdo in previous
1291 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1292 * resent due to cksum error, this will allow Server to
1293 * check+dump pages on its side */
1295 ptlrpc_request_set_replen(req);
1297 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1298 aa = ptlrpc_req_async_args(req);
1300 aa->aa_requested_nob = requested_nob;
1301 aa->aa_nio_count = niocount;
1302 aa->aa_page_count = page_count;
1306 INIT_LIST_HEAD(&aa->aa_oaps);
1309 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1310 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1311 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1312 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1316 ptlrpc_req_finished(req);
1320 char dbgcksum_file_name[PATH_MAX];
1322 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1323 struct brw_page **pga, __u32 server_cksum,
1332 /* will only keep dump of pages on first error for the same range in
1333 * file/fid, not during the resends/retries. */
1334 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1335 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1336 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1337 libcfs_debug_file_path_arr :
1338 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1339 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1340 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1341 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1343 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1344 client_cksum, server_cksum);
1345 filp = filp_open(dbgcksum_file_name,
1346 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1350 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1351 "checksum error: rc = %d\n", dbgcksum_file_name,
1354 CERROR("%s: can't open to dump pages with checksum "
1355 "error: rc = %d\n", dbgcksum_file_name, rc);
1361 for (i = 0; i < page_count; i++) {
1362 len = pga[i]->count;
1363 buf = kmap(pga[i]->pg);
1365 rc = vfs_write(filp, (__force const char __user *)buf,
1368 CERROR("%s: wanted to write %u but got %d "
1369 "error\n", dbgcksum_file_name, len, rc);
1374 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1375 dbgcksum_file_name, rc);
1381 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1383 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1384 filp_close(filp, NULL);
1389 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1390 __u32 client_cksum, __u32 server_cksum,
1391 struct osc_brw_async_args *aa)
1395 enum cksum_types cksum_type;
1397 if (server_cksum == client_cksum) {
1398 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1402 if (aa->aa_cli->cl_checksum_dump)
1403 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1404 server_cksum, client_cksum);
1406 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1408 new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1409 aa->aa_ppga, OST_WRITE, cksum_type);
1411 if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1412 msg = "the server did not use the checksum type specified in "
1413 "the original request - likely a protocol problem";
1414 else if (new_cksum == server_cksum)
1415 msg = "changed on the client after we checksummed it - "
1416 "likely false positive due to mmap IO (bug 11742)";
1417 else if (new_cksum == client_cksum)
1418 msg = "changed in transit before arrival at OST";
1420 msg = "changed in transit AND doesn't match the original - "
1421 "likely false positive due to mmap IO (bug 11742)";
1423 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1424 DFID " object "DOSTID" extent [%llu-%llu], original "
1425 "client csum %x (type %x), server csum %x (type %x),"
1426 " client csum now %x\n",
1427 aa->aa_cli->cl_import->imp_obd->obd_name,
1428 msg, libcfs_nid2str(peer->nid),
1429 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1430 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1431 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1432 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1433 aa->aa_ppga[aa->aa_page_count - 1]->off +
1434 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1435 client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1436 server_cksum, cksum_type, new_cksum);
1440 /* Note rc enters this function as number of bytes transferred */
1441 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1443 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1444 const struct lnet_process_id *peer =
1445 &req->rq_import->imp_connection->c_peer;
1446 struct client_obd *cli = aa->aa_cli;
1447 struct ost_body *body;
1448 u32 client_cksum = 0;
1451 if (rc < 0 && rc != -EDQUOT) {
1452 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1456 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1457 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1459 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1463 /* set/clear over quota flag for a uid/gid/projid */
1464 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1465 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1466 unsigned qid[LL_MAXQUOTAS] = {
1467 body->oa.o_uid, body->oa.o_gid,
1468 body->oa.o_projid };
1469 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1470 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1471 body->oa.o_valid, body->oa.o_flags);
1472 osc_quota_setdq(cli, qid, body->oa.o_valid,
1476 osc_update_grant(cli, body);
1481 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1482 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1484 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1486 CERROR("Unexpected +ve rc %d\n", rc);
1489 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1491 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1494 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1495 check_write_checksum(&body->oa, peer, client_cksum,
1496 body->oa.o_cksum, aa))
1499 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1500 aa->aa_page_count, aa->aa_ppga);
1504 /* The rest of this function executes only for OST_READs */
1506 /* if unwrap_bulk failed, return -EAGAIN to retry */
1507 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1509 GOTO(out, rc = -EAGAIN);
1511 if (rc > aa->aa_requested_nob) {
1512 CERROR("Unexpected rc %d (%d requested)\n", rc,
1513 aa->aa_requested_nob);
1517 if (rc != req->rq_bulk->bd_nob_transferred) {
1518 CERROR ("Unexpected rc %d (%d transferred)\n",
1519 rc, req->rq_bulk->bd_nob_transferred);
1523 if (rc < aa->aa_requested_nob)
1524 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1526 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1527 static int cksum_counter;
1528 u32 server_cksum = body->oa.o_cksum;
1531 enum cksum_types cksum_type;
1533 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1534 body->oa.o_flags : 0);
1535 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1536 aa->aa_ppga, OST_READ,
1539 if (peer->nid != req->rq_bulk->bd_sender) {
1541 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1544 if (server_cksum != client_cksum) {
1545 struct ost_body *clbody;
1546 u32 page_count = aa->aa_page_count;
1548 clbody = req_capsule_client_get(&req->rq_pill,
1550 if (cli->cl_checksum_dump)
1551 dump_all_bulk_pages(&clbody->oa, page_count,
1552 aa->aa_ppga, server_cksum,
1555 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1556 "%s%s%s inode "DFID" object "DOSTID
1557 " extent [%llu-%llu], client %x, "
1558 "server %x, cksum_type %x\n",
1559 req->rq_import->imp_obd->obd_name,
1560 libcfs_nid2str(peer->nid),
1562 clbody->oa.o_valid & OBD_MD_FLFID ?
1563 clbody->oa.o_parent_seq : 0ULL,
1564 clbody->oa.o_valid & OBD_MD_FLFID ?
1565 clbody->oa.o_parent_oid : 0,
1566 clbody->oa.o_valid & OBD_MD_FLFID ?
1567 clbody->oa.o_parent_ver : 0,
1568 POSTID(&body->oa.o_oi),
1569 aa->aa_ppga[0]->off,
1570 aa->aa_ppga[page_count-1]->off +
1571 aa->aa_ppga[page_count-1]->count - 1,
1572 client_cksum, server_cksum,
1575 aa->aa_oa->o_cksum = client_cksum;
1579 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1582 } else if (unlikely(client_cksum)) {
1583 static int cksum_missed;
1586 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1587 CERROR("Checksum %u requested from %s but not sent\n",
1588 cksum_missed, libcfs_nid2str(peer->nid));
1594 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1595 aa->aa_oa, &body->oa);
1600 static int osc_brw_redo_request(struct ptlrpc_request *request,
1601 struct osc_brw_async_args *aa, int rc)
1603 struct ptlrpc_request *new_req;
1604 struct osc_brw_async_args *new_aa;
1605 struct osc_async_page *oap;
1608 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1609 "redo for recoverable error %d", rc);
1611 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1612 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1613 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1614 aa->aa_ppga, &new_req, 1);
1618 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1619 if (oap->oap_request != NULL) {
1620 LASSERTF(request == oap->oap_request,
1621 "request %p != oap_request %p\n",
1622 request, oap->oap_request);
1623 if (oap->oap_interrupted) {
1624 ptlrpc_req_finished(new_req);
1629 /* New request takes over pga and oaps from old request.
1630 * Note that copying a list_head doesn't work, need to move it... */
1632 new_req->rq_interpret_reply = request->rq_interpret_reply;
1633 new_req->rq_async_args = request->rq_async_args;
1634 new_req->rq_commit_cb = request->rq_commit_cb;
1635 /* cap resend delay to the current request timeout, this is similar to
1636 * what ptlrpc does (see after_reply()) */
1637 if (aa->aa_resends > new_req->rq_timeout)
1638 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1640 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1641 new_req->rq_generation_set = 1;
1642 new_req->rq_import_generation = request->rq_import_generation;
1644 new_aa = ptlrpc_req_async_args(new_req);
1646 INIT_LIST_HEAD(&new_aa->aa_oaps);
1647 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1648 INIT_LIST_HEAD(&new_aa->aa_exts);
1649 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1650 new_aa->aa_resends = aa->aa_resends;
1652 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1653 if (oap->oap_request) {
1654 ptlrpc_req_finished(oap->oap_request);
1655 oap->oap_request = ptlrpc_request_addref(new_req);
1659 /* XXX: This code will run into problem if we're going to support
1660 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1661 * and wait for all of them to be finished. We should inherit request
1662 * set from old request. */
1663 ptlrpcd_add_req(new_req);
1665 DEBUG_REQ(D_INFO, new_req, "new request");
1670 * ugh, we want disk allocation on the target to happen in offset order. we'll
1671 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1672 * fine for our small page arrays and doesn't require allocation. its an
1673 * insertion sort that swaps elements that are strides apart, shrinking the
1674 * stride down until its '1' and the array is sorted.
1676 static void sort_brw_pages(struct brw_page **array, int num)
1679 struct brw_page *tmp;
1683 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1688 for (i = stride ; i < num ; i++) {
1691 while (j >= stride && array[j - stride]->off > tmp->off) {
1692 array[j] = array[j - stride];
1697 } while (stride > 1);
1700 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1702 LASSERT(ppga != NULL);
1703 OBD_FREE(ppga, sizeof(*ppga) * count);
1706 static int brw_interpret(const struct lu_env *env,
1707 struct ptlrpc_request *req, void *data, int rc)
1709 struct osc_brw_async_args *aa = data;
1710 struct osc_extent *ext;
1711 struct osc_extent *tmp;
1712 struct client_obd *cli = aa->aa_cli;
1715 rc = osc_brw_fini_request(req, rc);
1716 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1717 /* When server return -EINPROGRESS, client should always retry
1718 * regardless of the number of times the bulk was resent already. */
1719 if (osc_recoverable_error(rc)) {
1720 if (req->rq_import_generation !=
1721 req->rq_import->imp_generation) {
1722 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1723 ""DOSTID", rc = %d.\n",
1724 req->rq_import->imp_obd->obd_name,
1725 POSTID(&aa->aa_oa->o_oi), rc);
1726 } else if (rc == -EINPROGRESS ||
1727 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1728 rc = osc_brw_redo_request(req, aa, rc);
1730 CERROR("%s: too many resent retries for object: "
1731 "%llu:%llu, rc = %d.\n",
1732 req->rq_import->imp_obd->obd_name,
1733 POSTID(&aa->aa_oa->o_oi), rc);
1738 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1743 struct obdo *oa = aa->aa_oa;
1744 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1745 unsigned long valid = 0;
1746 struct cl_object *obj;
1747 struct osc_async_page *last;
1749 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1750 obj = osc2cl(last->oap_obj);
1752 cl_object_attr_lock(obj);
1753 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1754 attr->cat_blocks = oa->o_blocks;
1755 valid |= CAT_BLOCKS;
1757 if (oa->o_valid & OBD_MD_FLMTIME) {
1758 attr->cat_mtime = oa->o_mtime;
1761 if (oa->o_valid & OBD_MD_FLATIME) {
1762 attr->cat_atime = oa->o_atime;
1765 if (oa->o_valid & OBD_MD_FLCTIME) {
1766 attr->cat_ctime = oa->o_ctime;
1770 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1771 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1772 loff_t last_off = last->oap_count + last->oap_obj_off +
1775 /* Change file size if this is an out of quota or
1776 * direct IO write and it extends the file size */
1777 if (loi->loi_lvb.lvb_size < last_off) {
1778 attr->cat_size = last_off;
1781 /* Extend KMS if it's not a lockless write */
1782 if (loi->loi_kms < last_off &&
1783 oap2osc_page(last)->ops_srvlock == 0) {
1784 attr->cat_kms = last_off;
1790 cl_object_attr_update(env, obj, attr, valid);
1791 cl_object_attr_unlock(obj);
1793 OBDO_FREE(aa->aa_oa);
1795 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1796 osc_inc_unstable_pages(req);
1798 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1799 list_del_init(&ext->oe_link);
1800 osc_extent_finish(env, ext, 1, rc);
1802 LASSERT(list_empty(&aa->aa_exts));
1803 LASSERT(list_empty(&aa->aa_oaps));
1805 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1806 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1808 spin_lock(&cli->cl_loi_list_lock);
1809 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1810 * is called so we know whether to go to sync BRWs or wait for more
1811 * RPCs to complete */
1812 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1813 cli->cl_w_in_flight--;
1815 cli->cl_r_in_flight--;
1816 osc_wake_cache_waiters(cli);
1817 spin_unlock(&cli->cl_loi_list_lock);
1819 osc_io_unplug(env, cli, NULL);
1823 static void brw_commit(struct ptlrpc_request *req)
1825 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1826 * this called via the rq_commit_cb, I need to ensure
1827 * osc_dec_unstable_pages is still called. Otherwise unstable
1828 * pages may be leaked. */
1829 spin_lock(&req->rq_lock);
1830 if (likely(req->rq_unstable)) {
1831 req->rq_unstable = 0;
1832 spin_unlock(&req->rq_lock);
1834 osc_dec_unstable_pages(req);
1836 req->rq_committed = 1;
1837 spin_unlock(&req->rq_lock);
1842 * Build an RPC by the list of extent @ext_list. The caller must ensure
1843 * that the total pages in this list are NOT over max pages per RPC.
1844 * Extents in the list must be in OES_RPC state.
1846 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1847 struct list_head *ext_list, int cmd)
1849 struct ptlrpc_request *req = NULL;
1850 struct osc_extent *ext;
1851 struct brw_page **pga = NULL;
1852 struct osc_brw_async_args *aa = NULL;
1853 struct obdo *oa = NULL;
1854 struct osc_async_page *oap;
1855 struct osc_object *obj = NULL;
1856 struct cl_req_attr *crattr = NULL;
1857 loff_t starting_offset = OBD_OBJECT_EOF;
1858 loff_t ending_offset = 0;
1862 bool soft_sync = false;
1863 bool interrupted = false;
1867 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1868 struct ost_body *body;
1870 LASSERT(!list_empty(ext_list));
1872 /* add pages into rpc_list to build BRW rpc */
1873 list_for_each_entry(ext, ext_list, oe_link) {
1874 LASSERT(ext->oe_state == OES_RPC);
1875 mem_tight |= ext->oe_memalloc;
1876 grant += ext->oe_grants;
1877 page_count += ext->oe_nr_pages;
1882 soft_sync = osc_over_unstable_soft_limit(cli);
1884 mpflag = cfs_memory_pressure_get_and_set();
1886 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1888 GOTO(out, rc = -ENOMEM);
1892 GOTO(out, rc = -ENOMEM);
1895 list_for_each_entry(ext, ext_list, oe_link) {
1896 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1898 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1900 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1901 pga[i] = &oap->oap_brw_page;
1902 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1905 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1906 if (starting_offset == OBD_OBJECT_EOF ||
1907 starting_offset > oap->oap_obj_off)
1908 starting_offset = oap->oap_obj_off;
1910 LASSERT(oap->oap_page_off == 0);
1911 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1912 ending_offset = oap->oap_obj_off +
1915 LASSERT(oap->oap_page_off + oap->oap_count ==
1917 if (oap->oap_interrupted)
1922 /* first page in the list */
1923 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1925 crattr = &osc_env_info(env)->oti_req_attr;
1926 memset(crattr, 0, sizeof(*crattr));
1927 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1928 crattr->cra_flags = ~0ULL;
1929 crattr->cra_page = oap2cl_page(oap);
1930 crattr->cra_oa = oa;
1931 cl_req_attr_set(env, osc2cl(obj), crattr);
1933 if (cmd == OBD_BRW_WRITE)
1934 oa->o_grant_used = grant;
1936 sort_brw_pages(pga, page_count);
1937 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1939 CERROR("prep_req failed: %d\n", rc);
1943 req->rq_commit_cb = brw_commit;
1944 req->rq_interpret_reply = brw_interpret;
1945 req->rq_memalloc = mem_tight != 0;
1946 oap->oap_request = ptlrpc_request_addref(req);
1947 if (interrupted && !req->rq_intr)
1948 ptlrpc_mark_interrupted(req);
1950 /* Need to update the timestamps after the request is built in case
1951 * we race with setattr (locally or in queue at OST). If OST gets
1952 * later setattr before earlier BRW (as determined by the request xid),
1953 * the OST will not use BRW timestamps. Sadly, there is no obvious
1954 * way to do this in a single call. bug 10150 */
1955 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1956 crattr->cra_oa = &body->oa;
1957 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1958 cl_req_attr_set(env, osc2cl(obj), crattr);
1959 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1961 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1962 aa = ptlrpc_req_async_args(req);
1963 INIT_LIST_HEAD(&aa->aa_oaps);
1964 list_splice_init(&rpc_list, &aa->aa_oaps);
1965 INIT_LIST_HEAD(&aa->aa_exts);
1966 list_splice_init(ext_list, &aa->aa_exts);
1968 spin_lock(&cli->cl_loi_list_lock);
1969 starting_offset >>= PAGE_SHIFT;
1970 if (cmd == OBD_BRW_READ) {
1971 cli->cl_r_in_flight++;
1972 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1973 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1974 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1975 starting_offset + 1);
1977 cli->cl_w_in_flight++;
1978 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1979 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1980 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1981 starting_offset + 1);
1983 spin_unlock(&cli->cl_loi_list_lock);
1985 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1986 page_count, aa, cli->cl_r_in_flight,
1987 cli->cl_w_in_flight);
1988 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1990 ptlrpcd_add_req(req);
1996 cfs_memory_pressure_restore(mpflag);
1999 LASSERT(req == NULL);
2004 OBD_FREE(pga, sizeof(*pga) * page_count);
2005 /* this should happen rarely and is pretty bad, it makes the
2006 * pending list not follow the dirty order */
2007 while (!list_empty(ext_list)) {
2008 ext = list_entry(ext_list->next, struct osc_extent,
2010 list_del_init(&ext->oe_link);
2011 osc_extent_finish(env, ext, 0, rc);
2017 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2021 LASSERT(lock != NULL);
2023 lock_res_and_lock(lock);
2025 if (lock->l_ast_data == NULL)
2026 lock->l_ast_data = data;
2027 if (lock->l_ast_data == data)
2030 unlock_res_and_lock(lock);
2035 static int osc_enqueue_fini(struct ptlrpc_request *req,
2036 osc_enqueue_upcall_f upcall, void *cookie,
2037 struct lustre_handle *lockh, enum ldlm_mode mode,
2038 __u64 *flags, int agl, int errcode)
2040 bool intent = *flags & LDLM_FL_HAS_INTENT;
2044 /* The request was created before ldlm_cli_enqueue call. */
2045 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2046 struct ldlm_reply *rep;
2048 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2049 LASSERT(rep != NULL);
2051 rep->lock_policy_res1 =
2052 ptlrpc_status_ntoh(rep->lock_policy_res1);
2053 if (rep->lock_policy_res1)
2054 errcode = rep->lock_policy_res1;
2056 *flags |= LDLM_FL_LVB_READY;
2057 } else if (errcode == ELDLM_OK) {
2058 *flags |= LDLM_FL_LVB_READY;
2061 /* Call the update callback. */
2062 rc = (*upcall)(cookie, lockh, errcode);
2064 /* release the reference taken in ldlm_cli_enqueue() */
2065 if (errcode == ELDLM_LOCK_MATCHED)
2067 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2068 ldlm_lock_decref(lockh, mode);
2073 static int osc_enqueue_interpret(const struct lu_env *env,
2074 struct ptlrpc_request *req,
2075 struct osc_enqueue_args *aa, int rc)
2077 struct ldlm_lock *lock;
2078 struct lustre_handle *lockh = &aa->oa_lockh;
2079 enum ldlm_mode mode = aa->oa_mode;
2080 struct ost_lvb *lvb = aa->oa_lvb;
2081 __u32 lvb_len = sizeof(*lvb);
2086 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2088 lock = ldlm_handle2lock(lockh);
2089 LASSERTF(lock != NULL,
2090 "lockh %#llx, req %p, aa %p - client evicted?\n",
2091 lockh->cookie, req, aa);
2093 /* Take an additional reference so that a blocking AST that
2094 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2095 * to arrive after an upcall has been executed by
2096 * osc_enqueue_fini(). */
2097 ldlm_lock_addref(lockh, mode);
2099 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2100 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2102 /* Let CP AST to grant the lock first. */
2103 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2106 LASSERT(aa->oa_lvb == NULL);
2107 LASSERT(aa->oa_flags == NULL);
2108 aa->oa_flags = &flags;
2111 /* Complete obtaining the lock procedure. */
2112 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2113 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2115 /* Complete osc stuff. */
2116 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2117 aa->oa_flags, aa->oa_agl, rc);
2119 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2121 ldlm_lock_decref(lockh, mode);
2122 LDLM_LOCK_PUT(lock);
2126 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2128 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2129 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2130 * other synchronous requests, however keeping some locks and trying to obtain
2131 * others may take a considerable amount of time in a case of ost failure; and
2132 * when other sync requests do not get released lock from a client, the client
2133 * is evicted from the cluster -- such scenarious make the life difficult, so
2134 * release locks just after they are obtained. */
2135 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2136 __u64 *flags, union ldlm_policy_data *policy,
2137 struct ost_lvb *lvb, int kms_valid,
2138 osc_enqueue_upcall_f upcall, void *cookie,
2139 struct ldlm_enqueue_info *einfo,
2140 struct ptlrpc_request_set *rqset, int async, int agl)
2142 struct obd_device *obd = exp->exp_obd;
2143 struct lustre_handle lockh = { 0 };
2144 struct ptlrpc_request *req = NULL;
2145 int intent = *flags & LDLM_FL_HAS_INTENT;
2146 __u64 match_flags = *flags;
2147 enum ldlm_mode mode;
2151 /* Filesystem lock extents are extended to page boundaries so that
2152 * dealing with the page cache is a little smoother. */
2153 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2154 policy->l_extent.end |= ~PAGE_MASK;
2157 * kms is not valid when either object is completely fresh (so that no
2158 * locks are cached), or object was evicted. In the latter case cached
2159 * lock cannot be used, because it would prime inode state with
2160 * potentially stale LVB.
2165 /* Next, search for already existing extent locks that will cover us */
2166 /* If we're trying to read, we also search for an existing PW lock. The
2167 * VFS and page cache already protect us locally, so lots of readers/
2168 * writers can share a single PW lock.
2170 * There are problems with conversion deadlocks, so instead of
2171 * converting a read lock to a write lock, we'll just enqueue a new
2174 * At some point we should cancel the read lock instead of making them
2175 * send us a blocking callback, but there are problems with canceling
2176 * locks out from other users right now, too. */
2177 mode = einfo->ei_mode;
2178 if (einfo->ei_mode == LCK_PR)
2181 match_flags |= LDLM_FL_LVB_READY;
2183 match_flags |= LDLM_FL_BLOCK_GRANTED;
2184 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2185 einfo->ei_type, policy, mode, &lockh, 0);
2187 struct ldlm_lock *matched;
2189 if (*flags & LDLM_FL_TEST_LOCK)
2192 matched = ldlm_handle2lock(&lockh);
2194 /* AGL enqueues DLM locks speculatively. Therefore if
2195 * it already exists a DLM lock, it wll just inform the
2196 * caller to cancel the AGL process for this stripe. */
2197 ldlm_lock_decref(&lockh, mode);
2198 LDLM_LOCK_PUT(matched);
2200 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2201 *flags |= LDLM_FL_LVB_READY;
2203 /* We already have a lock, and it's referenced. */
2204 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2206 ldlm_lock_decref(&lockh, mode);
2207 LDLM_LOCK_PUT(matched);
2210 ldlm_lock_decref(&lockh, mode);
2211 LDLM_LOCK_PUT(matched);
2216 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2220 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2221 &RQF_LDLM_ENQUEUE_LVB);
2225 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2227 ptlrpc_request_free(req);
2231 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2233 ptlrpc_request_set_replen(req);
2236 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2237 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2239 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2240 sizeof(*lvb), LVB_T_OST, &lockh, async);
2243 struct osc_enqueue_args *aa;
2244 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2245 aa = ptlrpc_req_async_args(req);
2247 aa->oa_mode = einfo->ei_mode;
2248 aa->oa_type = einfo->ei_type;
2249 lustre_handle_copy(&aa->oa_lockh, &lockh);
2250 aa->oa_upcall = upcall;
2251 aa->oa_cookie = cookie;
2254 aa->oa_flags = flags;
2257 /* AGL is essentially to enqueue an DLM lock
2258 * in advance, so we don't care about the
2259 * result of AGL enqueue. */
2261 aa->oa_flags = NULL;
2264 req->rq_interpret_reply =
2265 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2266 if (rqset == PTLRPCD_SET)
2267 ptlrpcd_add_req(req);
2269 ptlrpc_set_add_req(rqset, req);
2270 } else if (intent) {
2271 ptlrpc_req_finished(req);
2276 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2279 ptlrpc_req_finished(req);
2284 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2285 enum ldlm_type type, union ldlm_policy_data *policy,
2286 enum ldlm_mode mode, __u64 *flags, void *data,
2287 struct lustre_handle *lockh, int unref)
2289 struct obd_device *obd = exp->exp_obd;
2290 __u64 lflags = *flags;
2294 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2297 /* Filesystem lock extents are extended to page boundaries so that
2298 * dealing with the page cache is a little smoother */
2299 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2300 policy->l_extent.end |= ~PAGE_MASK;
2302 /* Next, search for already existing extent locks that will cover us */
2303 /* If we're trying to read, we also search for an existing PW lock. The
2304 * VFS and page cache already protect us locally, so lots of readers/
2305 * writers can share a single PW lock. */
2309 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2310 res_id, type, policy, rc, lockh, unref);
2311 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2315 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2317 LASSERT(lock != NULL);
2318 if (!osc_set_lock_data(lock, data)) {
2319 ldlm_lock_decref(lockh, rc);
2322 LDLM_LOCK_PUT(lock);
2327 static int osc_statfs_interpret(const struct lu_env *env,
2328 struct ptlrpc_request *req,
2329 struct osc_async_args *aa, int rc)
2331 struct obd_statfs *msfs;
2335 /* The request has in fact never been sent
2336 * due to issues at a higher level (LOV).
2337 * Exit immediately since the caller is
2338 * aware of the problem and takes care
2339 * of the clean up */
2342 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2343 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2349 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2351 GOTO(out, rc = -EPROTO);
2354 *aa->aa_oi->oi_osfs = *msfs;
2356 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2360 static int osc_statfs_async(struct obd_export *exp,
2361 struct obd_info *oinfo, __u64 max_age,
2362 struct ptlrpc_request_set *rqset)
2364 struct obd_device *obd = class_exp2obd(exp);
2365 struct ptlrpc_request *req;
2366 struct osc_async_args *aa;
2370 /* We could possibly pass max_age in the request (as an absolute
2371 * timestamp or a "seconds.usec ago") so the target can avoid doing
2372 * extra calls into the filesystem if that isn't necessary (e.g.
2373 * during mount that would help a bit). Having relative timestamps
2374 * is not so great if request processing is slow, while absolute
2375 * timestamps are not ideal because they need time synchronization. */
2376 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2380 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2382 ptlrpc_request_free(req);
2385 ptlrpc_request_set_replen(req);
2386 req->rq_request_portal = OST_CREATE_PORTAL;
2387 ptlrpc_at_set_req_timeout(req);
2389 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2390 /* procfs requests not want stat in wait for avoid deadlock */
2391 req->rq_no_resend = 1;
2392 req->rq_no_delay = 1;
2395 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2396 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2397 aa = ptlrpc_req_async_args(req);
2400 ptlrpc_set_add_req(rqset, req);
2404 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2405 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2407 struct obd_device *obd = class_exp2obd(exp);
2408 struct obd_statfs *msfs;
2409 struct ptlrpc_request *req;
2410 struct obd_import *imp = NULL;
2414 /*Since the request might also come from lprocfs, so we need
2415 *sync this with client_disconnect_export Bug15684*/
2416 down_read(&obd->u.cli.cl_sem);
2417 if (obd->u.cli.cl_import)
2418 imp = class_import_get(obd->u.cli.cl_import);
2419 up_read(&obd->u.cli.cl_sem);
2423 /* We could possibly pass max_age in the request (as an absolute
2424 * timestamp or a "seconds.usec ago") so the target can avoid doing
2425 * extra calls into the filesystem if that isn't necessary (e.g.
2426 * during mount that would help a bit). Having relative timestamps
2427 * is not so great if request processing is slow, while absolute
2428 * timestamps are not ideal because they need time synchronization. */
2429 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2431 class_import_put(imp);
2436 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2438 ptlrpc_request_free(req);
2441 ptlrpc_request_set_replen(req);
2442 req->rq_request_portal = OST_CREATE_PORTAL;
2443 ptlrpc_at_set_req_timeout(req);
2445 if (flags & OBD_STATFS_NODELAY) {
2446 /* procfs requests not want stat in wait for avoid deadlock */
2447 req->rq_no_resend = 1;
2448 req->rq_no_delay = 1;
2451 rc = ptlrpc_queue_wait(req);
2455 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2457 GOTO(out, rc = -EPROTO);
2464 ptlrpc_req_finished(req);
2468 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2469 void *karg, void __user *uarg)
2471 struct obd_device *obd = exp->exp_obd;
2472 struct obd_ioctl_data *data = karg;
2476 if (!try_module_get(THIS_MODULE)) {
2477 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2478 module_name(THIS_MODULE));
2482 case OBD_IOC_CLIENT_RECOVER:
2483 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2484 data->ioc_inlbuf1, 0);
2488 case IOC_OSC_SET_ACTIVE:
2489 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2492 case OBD_IOC_PING_TARGET:
2493 err = ptlrpc_obd_ping(obd);
2496 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2497 cmd, current_comm());
2498 GOTO(out, err = -ENOTTY);
2501 module_put(THIS_MODULE);
2505 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2506 u32 keylen, void *key,
2507 u32 vallen, void *val,
2508 struct ptlrpc_request_set *set)
2510 struct ptlrpc_request *req;
2511 struct obd_device *obd = exp->exp_obd;
2512 struct obd_import *imp = class_exp2cliimp(exp);
2517 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2519 if (KEY_IS(KEY_CHECKSUM)) {
2520 if (vallen != sizeof(int))
2522 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2526 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2527 sptlrpc_conf_client_adapt(obd);
2531 if (KEY_IS(KEY_FLUSH_CTX)) {
2532 sptlrpc_import_flush_my_ctx(imp);
2536 if (KEY_IS(KEY_CACHE_SET)) {
2537 struct client_obd *cli = &obd->u.cli;
2539 LASSERT(cli->cl_cache == NULL); /* only once */
2540 cli->cl_cache = (struct cl_client_cache *)val;
2541 cl_cache_incref(cli->cl_cache);
2542 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2544 /* add this osc into entity list */
2545 LASSERT(list_empty(&cli->cl_lru_osc));
2546 spin_lock(&cli->cl_cache->ccc_lru_lock);
2547 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2548 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2553 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2554 struct client_obd *cli = &obd->u.cli;
2555 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2556 long target = *(long *)val;
2558 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2563 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2566 /* We pass all other commands directly to OST. Since nobody calls osc
2567 methods directly and everybody is supposed to go through LOV, we
2568 assume lov checked invalid values for us.
2569 The only recognised values so far are evict_by_nid and mds_conn.
2570 Even if something bad goes through, we'd get a -EINVAL from OST
2573 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2574 &RQF_OST_SET_GRANT_INFO :
2579 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2580 RCL_CLIENT, keylen);
2581 if (!KEY_IS(KEY_GRANT_SHRINK))
2582 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2583 RCL_CLIENT, vallen);
2584 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2586 ptlrpc_request_free(req);
2590 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2591 memcpy(tmp, key, keylen);
2592 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2595 memcpy(tmp, val, vallen);
2597 if (KEY_IS(KEY_GRANT_SHRINK)) {
2598 struct osc_grant_args *aa;
2601 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2602 aa = ptlrpc_req_async_args(req);
2605 ptlrpc_req_finished(req);
2608 *oa = ((struct ost_body *)val)->oa;
2610 req->rq_interpret_reply = osc_shrink_grant_interpret;
2613 ptlrpc_request_set_replen(req);
2614 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2615 LASSERT(set != NULL);
2616 ptlrpc_set_add_req(set, req);
2617 ptlrpc_check_set(NULL, set);
2619 ptlrpcd_add_req(req);
2625 static int osc_reconnect(const struct lu_env *env,
2626 struct obd_export *exp, struct obd_device *obd,
2627 struct obd_uuid *cluuid,
2628 struct obd_connect_data *data,
2631 struct client_obd *cli = &obd->u.cli;
2633 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2637 spin_lock(&cli->cl_loi_list_lock);
2638 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2639 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2640 grant += cli->cl_dirty_grant;
2642 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2643 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2644 lost_grant = cli->cl_lost_grant;
2645 cli->cl_lost_grant = 0;
2646 spin_unlock(&cli->cl_loi_list_lock);
2648 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2649 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2650 data->ocd_version, data->ocd_grant, lost_grant);
2656 static int osc_disconnect(struct obd_export *exp)
2658 struct obd_device *obd = class_exp2obd(exp);
2661 rc = client_disconnect_export(exp);
2663 * Initially we put del_shrink_grant before disconnect_export, but it
2664 * causes the following problem if setup (connect) and cleanup
2665 * (disconnect) are tangled together.
2666 * connect p1 disconnect p2
2667 * ptlrpc_connect_import
2668 * ............... class_manual_cleanup
2671 * ptlrpc_connect_interrupt
2673 * add this client to shrink list
2675 * Bang! pinger trigger the shrink.
2676 * So the osc should be disconnected from the shrink list, after we
2677 * are sure the import has been destroyed. BUG18662
2679 if (obd->u.cli.cl_import == NULL)
2680 osc_del_shrink_grant(&obd->u.cli);
2684 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2685 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2687 struct lu_env *env = arg;
2688 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2689 struct ldlm_lock *lock;
2690 struct osc_object *osc = NULL;
2694 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2695 if (lock->l_ast_data != NULL && osc == NULL) {
2696 osc = lock->l_ast_data;
2697 cl_object_get(osc2cl(osc));
2700 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2701 * by the 2nd round of ldlm_namespace_clean() call in
2702 * osc_import_event(). */
2703 ldlm_clear_cleaned(lock);
2708 osc_object_invalidate(env, osc);
2709 cl_object_put(env, osc2cl(osc));
2715 static int osc_import_event(struct obd_device *obd,
2716 struct obd_import *imp,
2717 enum obd_import_event event)
2719 struct client_obd *cli;
2723 LASSERT(imp->imp_obd == obd);
2726 case IMP_EVENT_DISCON: {
2728 spin_lock(&cli->cl_loi_list_lock);
2729 cli->cl_avail_grant = 0;
2730 cli->cl_lost_grant = 0;
2731 spin_unlock(&cli->cl_loi_list_lock);
2734 case IMP_EVENT_INACTIVE: {
2735 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2738 case IMP_EVENT_INVALIDATE: {
2739 struct ldlm_namespace *ns = obd->obd_namespace;
2743 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2745 env = cl_env_get(&refcheck);
2747 osc_io_unplug(env, &obd->u.cli, NULL);
2749 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2750 osc_ldlm_resource_invalidate,
2752 cl_env_put(env, &refcheck);
2754 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2759 case IMP_EVENT_ACTIVE: {
2760 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2763 case IMP_EVENT_OCD: {
2764 struct obd_connect_data *ocd = &imp->imp_connect_data;
2766 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2767 osc_init_grant(&obd->u.cli, ocd);
2770 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2771 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2773 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2776 case IMP_EVENT_DEACTIVATE: {
2777 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2780 case IMP_EVENT_ACTIVATE: {
2781 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2785 CERROR("Unknown import event %d\n", event);
2792 * Determine whether the lock can be canceled before replaying the lock
2793 * during recovery, see bug16774 for detailed information.
2795 * \retval zero the lock can't be canceled
2796 * \retval other ok to cancel
2798 static int osc_cancel_weight(struct ldlm_lock *lock)
2801 * Cancel all unused and granted extent lock.
2803 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2804 lock->l_granted_mode == lock->l_req_mode &&
2805 osc_ldlm_weigh_ast(lock) == 0)
2811 static int brw_queue_work(const struct lu_env *env, void *data)
2813 struct client_obd *cli = data;
2815 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2817 osc_io_unplug(env, cli, NULL);
2821 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2823 struct client_obd *cli = &obd->u.cli;
2824 struct obd_type *type;
2832 rc = ptlrpcd_addref();
2836 rc = client_obd_setup(obd, lcfg);
2838 GOTO(out_ptlrpcd, rc);
2840 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2841 if (IS_ERR(handler))
2842 GOTO(out_client_setup, rc = PTR_ERR(handler));
2843 cli->cl_writeback_work = handler;
2845 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2846 if (IS_ERR(handler))
2847 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2848 cli->cl_lru_work = handler;
2850 rc = osc_quota_setup(obd);
2852 GOTO(out_ptlrpcd_work, rc);
2854 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2856 #ifdef CONFIG_PROC_FS
2857 obd->obd_vars = lprocfs_osc_obd_vars;
2859 /* If this is true then both client (osc) and server (osp) are on the
2860 * same node. The osp layer if loaded first will register the osc proc
2861 * directory. In that case this obd_device will be attached its proc
2862 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2863 type = class_search_type(LUSTRE_OSP_NAME);
2864 if (type && type->typ_procsym) {
2865 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2867 obd->obd_vars, obd);
2868 if (IS_ERR(obd->obd_proc_entry)) {
2869 rc = PTR_ERR(obd->obd_proc_entry);
2870 CERROR("error %d setting up lprocfs for %s\n", rc,
2872 obd->obd_proc_entry = NULL;
2875 rc = lprocfs_obd_setup(obd, false);
2878 /* If the basic OSC proc tree construction succeeded then
2879 * lets do the rest. */
2881 lproc_osc_attach_seqstat(obd);
2882 sptlrpc_lprocfs_cliobd_attach(obd);
2883 ptlrpc_lprocfs_register_obd(obd);
2887 * We try to control the total number of requests with a upper limit
2888 * osc_reqpool_maxreqcount. There might be some race which will cause
2889 * over-limit allocation, but it is fine.
2891 req_count = atomic_read(&osc_pool_req_count);
2892 if (req_count < osc_reqpool_maxreqcount) {
2893 adding = cli->cl_max_rpcs_in_flight + 2;
2894 if (req_count + adding > osc_reqpool_maxreqcount)
2895 adding = osc_reqpool_maxreqcount - req_count;
2897 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2898 atomic_add(added, &osc_pool_req_count);
2901 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2902 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2904 spin_lock(&osc_shrink_lock);
2905 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2906 spin_unlock(&osc_shrink_lock);
2911 if (cli->cl_writeback_work != NULL) {
2912 ptlrpcd_destroy_work(cli->cl_writeback_work);
2913 cli->cl_writeback_work = NULL;
2915 if (cli->cl_lru_work != NULL) {
2916 ptlrpcd_destroy_work(cli->cl_lru_work);
2917 cli->cl_lru_work = NULL;
2920 client_obd_cleanup(obd);
2926 static int osc_precleanup(struct obd_device *obd)
2928 struct client_obd *cli = &obd->u.cli;
2932 * for echo client, export may be on zombie list, wait for
2933 * zombie thread to cull it, because cli.cl_import will be
2934 * cleared in client_disconnect_export():
2935 * class_export_destroy() -> obd_cleanup() ->
2936 * echo_device_free() -> echo_client_cleanup() ->
2937 * obd_disconnect() -> osc_disconnect() ->
2938 * client_disconnect_export()
2940 obd_zombie_barrier();
2941 if (cli->cl_writeback_work) {
2942 ptlrpcd_destroy_work(cli->cl_writeback_work);
2943 cli->cl_writeback_work = NULL;
2946 if (cli->cl_lru_work) {
2947 ptlrpcd_destroy_work(cli->cl_lru_work);
2948 cli->cl_lru_work = NULL;
2951 obd_cleanup_client_import(obd);
2952 ptlrpc_lprocfs_unregister_obd(obd);
2953 lprocfs_obd_cleanup(obd);
2957 int osc_cleanup(struct obd_device *obd)
2959 struct client_obd *cli = &obd->u.cli;
2964 spin_lock(&osc_shrink_lock);
2965 list_del(&cli->cl_shrink_list);
2966 spin_unlock(&osc_shrink_lock);
2969 if (cli->cl_cache != NULL) {
2970 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2971 spin_lock(&cli->cl_cache->ccc_lru_lock);
2972 list_del_init(&cli->cl_lru_osc);
2973 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2974 cli->cl_lru_left = NULL;
2975 cl_cache_decref(cli->cl_cache);
2976 cli->cl_cache = NULL;
2979 /* free memory of osc quota cache */
2980 osc_quota_cleanup(obd);
2982 rc = client_obd_cleanup(obd);
2988 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2990 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2991 return rc > 0 ? 0: rc;
2994 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2996 return osc_process_config_base(obd, buf);
2999 static struct obd_ops osc_obd_ops = {
3000 .o_owner = THIS_MODULE,
3001 .o_setup = osc_setup,
3002 .o_precleanup = osc_precleanup,
3003 .o_cleanup = osc_cleanup,
3004 .o_add_conn = client_import_add_conn,
3005 .o_del_conn = client_import_del_conn,
3006 .o_connect = client_connect_import,
3007 .o_reconnect = osc_reconnect,
3008 .o_disconnect = osc_disconnect,
3009 .o_statfs = osc_statfs,
3010 .o_statfs_async = osc_statfs_async,
3011 .o_create = osc_create,
3012 .o_destroy = osc_destroy,
3013 .o_getattr = osc_getattr,
3014 .o_setattr = osc_setattr,
3015 .o_iocontrol = osc_iocontrol,
3016 .o_set_info_async = osc_set_info_async,
3017 .o_import_event = osc_import_event,
3018 .o_process_config = osc_process_config,
3019 .o_quotactl = osc_quotactl,
3022 static struct shrinker *osc_cache_shrinker;
3023 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3024 DEFINE_SPINLOCK(osc_shrink_lock);
3026 #ifndef HAVE_SHRINKER_COUNT
3027 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3029 struct shrink_control scv = {
3030 .nr_to_scan = shrink_param(sc, nr_to_scan),
3031 .gfp_mask = shrink_param(sc, gfp_mask)
3033 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3034 struct shrinker *shrinker = NULL;
3037 (void)osc_cache_shrink_scan(shrinker, &scv);
3039 return osc_cache_shrink_count(shrinker, &scv);
3043 static int __init osc_init(void)
3045 bool enable_proc = true;
3046 struct obd_type *type;
3047 unsigned int reqpool_size;
3048 unsigned int reqsize;
3050 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3051 osc_cache_shrink_count, osc_cache_shrink_scan);
3054 /* print an address of _any_ initialized kernel symbol from this
3055 * module, to allow debugging with gdb that doesn't support data
3056 * symbols from modules.*/
3057 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3059 rc = lu_kmem_init(osc_caches);
3063 type = class_search_type(LUSTRE_OSP_NAME);
3064 if (type != NULL && type->typ_procsym != NULL)
3065 enable_proc = false;
3067 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3068 LUSTRE_OSC_NAME, &osc_device_type);
3072 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3074 /* This is obviously too much memory, only prevent overflow here */
3075 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3076 GOTO(out_type, rc = -EINVAL);
3078 reqpool_size = osc_reqpool_mem_max << 20;
3081 while (reqsize < OST_IO_MAXREQSIZE)
3082 reqsize = reqsize << 1;
3085 * We don't enlarge the request count in OSC pool according to
3086 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3087 * tried after normal allocation failed. So a small OSC pool won't
3088 * cause much performance degression in most of cases.
3090 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3092 atomic_set(&osc_pool_req_count, 0);
3093 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3094 ptlrpc_add_rqs_to_pool);
3096 if (osc_rq_pool != NULL)
3100 class_unregister_type(LUSTRE_OSC_NAME);
3102 lu_kmem_fini(osc_caches);
3107 static void __exit osc_exit(void)
3109 remove_shrinker(osc_cache_shrinker);
3110 class_unregister_type(LUSTRE_OSC_NAME);
3111 lu_kmem_fini(osc_caches);
3112 ptlrpc_free_rq_pool(osc_rq_pool);
3115 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3116 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3117 MODULE_VERSION(LUSTRE_VERSION_STRING);
3118 MODULE_LICENSE("GPL");
3120 module_init(osc_init);
3121 module_exit(osc_exit);