4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre/lustre_user.h>
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
67 struct osc_brw_async_args {
73 struct brw_page **aa_ppga;
74 struct client_obd *aa_cli;
75 struct list_head aa_oaps;
76 struct list_head aa_exts;
79 #define osc_grant_args osc_brw_async_args
81 struct osc_setattr_args {
83 obd_enqueue_update_f sa_upcall;
87 struct osc_fsync_args {
88 struct osc_object *fa_obj;
90 obd_enqueue_update_f fa_upcall;
94 struct osc_ladvise_args {
96 obd_enqueue_update_f la_upcall;
100 struct osc_enqueue_args {
101 struct obd_export *oa_exp;
102 enum ldlm_type oa_type;
103 enum ldlm_mode oa_mode;
105 osc_enqueue_upcall_f oa_upcall;
107 struct ost_lvb *oa_lvb;
108 struct lustre_handle oa_lockh;
109 unsigned int oa_agl:1;
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
118 struct ost_body *body;
120 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
123 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
129 struct ptlrpc_request *req;
130 struct ost_body *body;
134 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
138 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
140 ptlrpc_request_free(req);
144 osc_pack_req_body(req, oa);
146 ptlrpc_request_set_replen(req);
148 rc = ptlrpc_queue_wait(req);
152 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154 GOTO(out, rc = -EPROTO);
156 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
159 oa->o_blksize = cli_brw_size(exp->exp_obd);
160 oa->o_valid |= OBD_MD_FLBLKSZ;
164 ptlrpc_req_finished(req);
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
172 struct ptlrpc_request *req;
173 struct ost_body *body;
177 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
179 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
183 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
185 ptlrpc_request_free(req);
189 osc_pack_req_body(req, oa);
191 ptlrpc_request_set_replen(req);
193 rc = ptlrpc_queue_wait(req);
197 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
199 GOTO(out, rc = -EPROTO);
201 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
205 ptlrpc_req_finished(req);
210 static int osc_setattr_interpret(const struct lu_env *env,
211 struct ptlrpc_request *req,
212 struct osc_setattr_args *sa, int rc)
214 struct ost_body *body;
220 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222 GOTO(out, rc = -EPROTO);
224 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
227 rc = sa->sa_upcall(sa->sa_cookie, rc);
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232 obd_enqueue_update_f upcall, void *cookie,
233 struct ptlrpc_request_set *rqset)
235 struct ptlrpc_request *req;
236 struct osc_setattr_args *sa;
241 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oa);
253 ptlrpc_request_set_replen(req);
255 /* do mds to ost setattr asynchronously */
257 /* Do not wait for response. */
258 ptlrpcd_add_req(req);
260 req->rq_interpret_reply =
261 (ptlrpc_interpterer_t)osc_setattr_interpret;
263 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264 sa = ptlrpc_req_async_args(req);
266 sa->sa_upcall = upcall;
267 sa->sa_cookie = cookie;
269 if (rqset == PTLRPCD_SET)
270 ptlrpcd_add_req(req);
272 ptlrpc_set_add_req(rqset, req);
278 static int osc_ladvise_interpret(const struct lu_env *env,
279 struct ptlrpc_request *req,
282 struct osc_ladvise_args *la = arg;
283 struct ost_body *body;
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 *la->la_oa = body->oa;
295 rc = la->la_upcall(la->la_cookie, rc);
300 * If rqset is NULL, do not wait for response. Upcall and cookie could also
301 * be NULL in this case
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304 struct ladvise_hdr *ladvise_hdr,
305 obd_enqueue_update_f upcall, void *cookie,
306 struct ptlrpc_request_set *rqset)
308 struct ptlrpc_request *req;
309 struct ost_body *body;
310 struct osc_ladvise_args *la;
312 struct lu_ladvise *req_ladvise;
313 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
314 int num_advise = ladvise_hdr->lah_count;
315 struct ladvise_hdr *req_ladvise_hdr;
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
322 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323 num_advise * sizeof(*ladvise));
324 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
326 ptlrpc_request_free(req);
329 req->rq_request_portal = OST_IO_PORTAL;
330 ptlrpc_at_set_req_timeout(req);
332 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
334 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
337 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338 &RMF_OST_LADVISE_HDR);
339 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
341 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343 ptlrpc_request_set_replen(req);
346 /* Do not wait for response. */
347 ptlrpcd_add_req(req);
351 req->rq_interpret_reply = osc_ladvise_interpret;
352 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353 la = ptlrpc_req_async_args(req);
355 la->la_upcall = upcall;
356 la->la_cookie = cookie;
358 if (rqset == PTLRPCD_SET)
359 ptlrpcd_add_req(req);
361 ptlrpc_set_add_req(rqset, req);
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
369 struct ptlrpc_request *req;
370 struct ost_body *body;
375 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
380 GOTO(out, rc = -ENOMEM);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
384 ptlrpc_request_free(req);
388 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
391 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
393 ptlrpc_request_set_replen(req);
395 rc = ptlrpc_queue_wait(req);
399 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
401 GOTO(out_req, rc = -EPROTO);
403 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
406 oa->o_blksize = cli_brw_size(exp->exp_obd);
407 oa->o_valid |= OBD_MD_FLBLKSZ;
409 CDEBUG(D_HA, "transno: "LPD64"\n",
410 lustre_msg_get_transno(req->rq_repmsg));
412 ptlrpc_req_finished(req);
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418 obd_enqueue_update_f upcall, void *cookie,
419 struct ptlrpc_request_set *rqset)
421 struct ptlrpc_request *req;
422 struct osc_setattr_args *sa;
423 struct ost_body *body;
427 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
431 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
433 ptlrpc_request_free(req);
436 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437 ptlrpc_at_set_req_timeout(req);
439 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
443 ptlrpc_request_set_replen(req);
445 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447 sa = ptlrpc_req_async_args(req);
449 sa->sa_upcall = upcall;
450 sa->sa_cookie = cookie;
451 if (rqset == PTLRPCD_SET)
452 ptlrpcd_add_req(req);
454 ptlrpc_set_add_req(rqset, req);
459 static int osc_sync_interpret(const struct lu_env *env,
460 struct ptlrpc_request *req,
463 struct osc_fsync_args *fa = arg;
464 struct ost_body *body;
465 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
466 unsigned long valid = 0;
467 struct cl_object *obj;
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 CERROR("can't unpack ost_body\n");
476 GOTO(out, rc = -EPROTO);
479 *fa->fa_oa = body->oa;
480 obj = osc2cl(fa->fa_obj);
482 /* Update osc object's blocks attribute */
483 cl_object_attr_lock(obj);
484 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485 attr->cat_blocks = body->oa.o_blocks;
490 cl_object_attr_update(env, obj, attr, valid);
491 cl_object_attr_unlock(obj);
494 rc = fa->fa_upcall(fa->fa_cookie, rc);
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499 obd_enqueue_update_f upcall, void *cookie,
500 struct ptlrpc_request_set *rqset)
502 struct obd_export *exp = osc_export(obj);
503 struct ptlrpc_request *req;
504 struct ost_body *body;
505 struct osc_fsync_args *fa;
509 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
513 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
515 ptlrpc_request_free(req);
519 /* overload the size and blocks fields in the oa with start/end */
520 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
522 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
524 ptlrpc_request_set_replen(req);
525 req->rq_interpret_reply = osc_sync_interpret;
527 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528 fa = ptlrpc_req_async_args(req);
531 fa->fa_upcall = upcall;
532 fa->fa_cookie = cookie;
534 if (rqset == PTLRPCD_SET)
535 ptlrpcd_add_req(req);
537 ptlrpc_set_add_req(rqset, req);
542 /* Find and cancel locally locks matched by @mode in the resource found by
543 * @objid. Found locks are added into @cancel list. Returns the amount of
544 * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546 struct list_head *cancels,
547 enum ldlm_mode mode, __u64 lock_flags)
549 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550 struct ldlm_res_id res_id;
551 struct ldlm_resource *res;
555 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556 * export) but disabled through procfs (flag in NS).
558 * This distinguishes from a case when ELC is not supported originally,
559 * when we still want to cancel locks in advance and just cancel them
560 * locally, without sending any RPC. */
561 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
564 ostid_build_res_name(&oa->o_oi, &res_id);
565 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
569 LDLM_RESOURCE_ADDREF(res);
570 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571 lock_flags, 0, NULL);
572 LDLM_RESOURCE_DELREF(res);
573 ldlm_resource_putref(res);
577 static int osc_destroy_interpret(const struct lu_env *env,
578 struct ptlrpc_request *req, void *data,
581 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
583 atomic_dec(&cli->cl_destroy_in_flight);
584 wake_up(&cli->cl_destroy_waitq);
588 static int osc_can_send_destroy(struct client_obd *cli)
590 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591 cli->cl_max_rpcs_in_flight) {
592 /* The destroy request can be sent */
595 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596 cli->cl_max_rpcs_in_flight) {
598 * The counter has been modified between the two atomic
601 wake_up(&cli->cl_destroy_waitq);
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
609 struct client_obd *cli = &exp->exp_obd->u.cli;
610 struct ptlrpc_request *req;
611 struct ost_body *body;
612 struct list_head cancels = LIST_HEAD_INIT(cancels);
617 CDEBUG(D_INFO, "oa NULL\n");
621 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622 LDLM_FL_DISCARD_DATA);
624 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
626 ldlm_lock_list_put(&cancels, l_bl_ast, count);
630 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
633 ptlrpc_request_free(req);
637 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638 ptlrpc_at_set_req_timeout(req);
640 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
642 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
644 ptlrpc_request_set_replen(req);
646 req->rq_interpret_reply = osc_destroy_interpret;
647 if (!osc_can_send_destroy(cli)) {
648 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
651 * Wait until the number of on-going destroy RPCs drops
652 * under max_rpc_in_flight
654 l_wait_event_exclusive(cli->cl_destroy_waitq,
655 osc_can_send_destroy(cli), &lwi);
658 /* Do not wait for response */
659 ptlrpcd_add_req(req);
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
666 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
668 LASSERT(!(oa->o_valid & bits));
671 spin_lock(&cli->cl_loi_list_lock);
672 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673 oa->o_dirty = cli->cl_dirty_grant;
675 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
676 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677 cli->cl_dirty_max_pages)) {
678 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679 cli->cl_dirty_pages, cli->cl_dirty_transit,
680 cli->cl_dirty_max_pages);
682 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683 atomic_long_read(&obd_dirty_transit_pages) >
684 (long)(obd_max_dirty_pages + 1))) {
685 /* The atomic_read() allowing the atomic_inc() are
686 * not covered by a lock thus they may safely race and trip
687 * this CERROR() unless we add in a small fudge factor (+1). */
688 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689 cli_name(cli), atomic_long_read(&obd_dirty_pages),
690 atomic_long_read(&obd_dirty_transit_pages),
691 obd_max_dirty_pages);
693 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
695 CERROR("dirty %lu - dirty_max %lu too big???\n",
696 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
699 unsigned long nrpages;
701 nrpages = cli->cl_max_pages_per_rpc;
702 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
705 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
709 /* take extent tax into account when asking for more
711 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
712 cli->cl_max_extent_pages;
713 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
716 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717 oa->o_dropped = cli->cl_lost_grant;
718 cli->cl_lost_grant = 0;
719 spin_unlock(&cli->cl_loi_list_lock);
720 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
721 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
724 void osc_update_next_shrink(struct client_obd *cli)
726 cli->cl_next_shrink_grant =
727 cfs_time_shift(cli->cl_grant_shrink_interval);
728 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729 cli->cl_next_shrink_grant);
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
734 spin_lock(&cli->cl_loi_list_lock);
735 cli->cl_avail_grant += grant;
736 spin_unlock(&cli->cl_loi_list_lock);
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
741 if (body->oa.o_valid & OBD_MD_FLGRANT) {
742 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
743 __osc_update_grant(cli, body->oa.o_grant);
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748 u32 keylen, void *key,
749 u32 vallen, void *val,
750 struct ptlrpc_request_set *set);
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753 struct ptlrpc_request *req,
756 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758 struct ost_body *body;
761 __osc_update_grant(cli, oa->o_grant);
765 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
767 osc_update_grant(cli, body);
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
775 spin_lock(&cli->cl_loi_list_lock);
776 oa->o_grant = cli->cl_avail_grant / 4;
777 cli->cl_avail_grant -= oa->o_grant;
778 spin_unlock(&cli->cl_loi_list_lock);
779 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780 oa->o_valid |= OBD_MD_FLFLAGS;
783 oa->o_flags |= OBD_FL_SHRINK_GRANT;
784 osc_update_next_shrink(cli);
787 /* Shrink the current grant, either from some large amount to enough for a
788 * full set of in-flight RPCs, or if we have already shrunk to that limit
789 * then to enough for a single RPC. This avoids keeping more grant than
790 * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
793 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
796 spin_lock(&cli->cl_loi_list_lock);
797 if (cli->cl_avail_grant <= target_bytes)
798 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
799 spin_unlock(&cli->cl_loi_list_lock);
801 return osc_shrink_grant_to_target(cli, target_bytes);
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
807 struct ost_body *body;
810 spin_lock(&cli->cl_loi_list_lock);
811 /* Don't shrink if we are already above or below the desired limit
812 * We don't want to shrink below a single RPC, as that will negatively
813 * impact block allocation and long-term performance. */
814 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
815 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
817 if (target_bytes >= cli->cl_avail_grant) {
818 spin_unlock(&cli->cl_loi_list_lock);
821 spin_unlock(&cli->cl_loi_list_lock);
827 osc_announce_cached(cli, &body->oa, 0);
829 spin_lock(&cli->cl_loi_list_lock);
830 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831 cli->cl_avail_grant = target_bytes;
832 spin_unlock(&cli->cl_loi_list_lock);
833 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834 body->oa.o_valid |= OBD_MD_FLFLAGS;
835 body->oa.o_flags = 0;
837 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838 osc_update_next_shrink(cli);
840 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842 sizeof(*body), body, NULL);
844 __osc_update_grant(cli, body->oa.o_grant);
849 static int osc_should_shrink_grant(struct client_obd *client)
851 cfs_time_t time = cfs_time_current();
852 cfs_time_t next_shrink = client->cl_next_shrink_grant;
854 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855 OBD_CONNECT_GRANT_SHRINK) == 0)
858 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859 /* Get the current RPC size directly, instead of going via:
860 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861 * Keep comment here so that it can be found by searching. */
862 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
864 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865 client->cl_avail_grant > brw_size)
868 osc_update_next_shrink(client);
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
875 struct client_obd *client;
877 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878 if (osc_should_shrink_grant(client))
879 osc_shrink_grant(client);
884 static int osc_add_shrink_grant(struct client_obd *client)
888 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
890 osc_grant_shrink_grant_cb, NULL,
891 &client->cl_grant_shrink_list);
893 CERROR("add grant client %s error %d\n", cli_name(client), rc);
896 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897 osc_update_next_shrink(client);
901 static int osc_del_shrink_grant(struct client_obd *client)
903 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
910 * ocd_grant is the total grant amount we're expect to hold: if we've
911 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
915 * race is tolerable here: if we're evicted, but imp_state already
916 * left EVICTED state, then cl_dirty_pages must be 0 already.
918 spin_lock(&cli->cl_loi_list_lock);
919 cli->cl_avail_grant = ocd->ocd_grant;
920 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921 cli->cl_avail_grant -= cli->cl_reserved_grant;
922 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923 cli->cl_avail_grant -= cli->cl_dirty_grant;
925 cli->cl_avail_grant -=
926 cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
929 if (cli->cl_avail_grant < 0) {
930 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931 cli_name(cli), cli->cl_avail_grant,
932 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
933 /* workaround for servers which do not have the patch from
935 cli->cl_avail_grant = ocd->ocd_grant;
938 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
941 /* overhead for each extent insertion */
942 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
943 /* determine the appropriate chunk size used by osc_extent. */
944 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
945 ocd->ocd_grant_blkbits);
946 /* determine maximum extent size, in #pages */
947 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
948 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
949 if (cli->cl_max_extent_pages == 0)
950 cli->cl_max_extent_pages = 1;
952 cli->cl_grant_extent_tax = 0;
953 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
954 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
956 spin_unlock(&cli->cl_loi_list_lock);
958 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
959 "chunk bits: %d cl_max_extent_pages: %d\n",
961 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
962 cli->cl_max_extent_pages);
964 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
965 list_empty(&cli->cl_grant_shrink_list))
966 osc_add_shrink_grant(cli);
969 /* We assume that the reason this OSC got a short read is because it read
970 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
971 * via the LOV, and it _knows_ it's reading inside the file, it's just that
972 * this stripe never got written at or beyond this stripe offset yet. */
973 static void handle_short_read(int nob_read, size_t page_count,
974 struct brw_page **pga)
979 /* skip bytes read OK */
980 while (nob_read > 0) {
981 LASSERT (page_count > 0);
983 if (pga[i]->count > nob_read) {
984 /* EOF inside this page */
985 ptr = kmap(pga[i]->pg) +
986 (pga[i]->off & ~PAGE_MASK);
987 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
994 nob_read -= pga[i]->count;
999 /* zero remaining pages */
1000 while (page_count-- > 0) {
1001 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1002 memset(ptr, 0, pga[i]->count);
1008 static int check_write_rcs(struct ptlrpc_request *req,
1009 int requested_nob, int niocount,
1010 size_t page_count, struct brw_page **pga)
1015 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1016 sizeof(*remote_rcs) *
1018 if (remote_rcs == NULL) {
1019 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1023 /* return error if any niobuf was in error */
1024 for (i = 0; i < niocount; i++) {
1025 if ((int)remote_rcs[i] < 0)
1026 return(remote_rcs[i]);
1028 if (remote_rcs[i] != 0) {
1029 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1030 i, remote_rcs[i], req);
1035 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1036 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1037 req->rq_bulk->bd_nob_transferred, requested_nob);
1044 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1046 if (p1->flag != p2->flag) {
1047 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1048 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1049 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1051 /* warn if we try to combine flags that we don't know to be
1052 * safe to combine */
1053 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1054 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1055 "report this at https://jira.hpdd.intel.com/\n",
1056 p1->flag, p2->flag);
1061 return (p1->off + p1->count == p2->off);
1064 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1065 struct brw_page **pga, int opc,
1066 cksum_type_t cksum_type)
1070 struct cfs_crypto_hash_desc *hdesc;
1071 unsigned int bufsize;
1073 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1075 LASSERT(pg_count > 0);
1077 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1078 if (IS_ERR(hdesc)) {
1079 CERROR("Unable to initialize checksum hash %s\n",
1080 cfs_crypto_hash_name(cfs_alg));
1081 return PTR_ERR(hdesc);
1084 while (nob > 0 && pg_count > 0) {
1085 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1087 /* corrupt the data before we compute the checksum, to
1088 * simulate an OST->client data error */
1089 if (i == 0 && opc == OST_READ &&
1090 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1091 unsigned char *ptr = kmap(pga[i]->pg);
1092 int off = pga[i]->off & ~PAGE_MASK;
1094 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1097 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1098 pga[i]->off & ~PAGE_MASK,
1100 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1101 (int)(pga[i]->off & ~PAGE_MASK));
1103 nob -= pga[i]->count;
1108 bufsize = sizeof(cksum);
1109 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1111 /* For sending we only compute the wrong checksum instead
1112 * of corrupting the data so it is still correct on a redo */
1113 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1120 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1121 u32 page_count, struct brw_page **pga,
1122 struct ptlrpc_request **reqp, int resend)
1124 struct ptlrpc_request *req;
1125 struct ptlrpc_bulk_desc *desc;
1126 struct ost_body *body;
1127 struct obd_ioobj *ioobj;
1128 struct niobuf_remote *niobuf;
1129 int niocount, i, requested_nob, opc, rc;
1130 struct osc_brw_async_args *aa;
1131 struct req_capsule *pill;
1132 struct brw_page *pg_prev;
1135 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1136 RETURN(-ENOMEM); /* Recoverable */
1137 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1138 RETURN(-EINVAL); /* Fatal */
1140 if ((cmd & OBD_BRW_WRITE) != 0) {
1142 req = ptlrpc_request_alloc_pool(cli->cl_import,
1144 &RQF_OST_BRW_WRITE);
1147 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1152 for (niocount = i = 1; i < page_count; i++) {
1153 if (!can_merge_pages(pga[i - 1], pga[i]))
1157 pill = &req->rq_pill;
1158 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1160 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1161 niocount * sizeof(*niobuf));
1163 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1165 ptlrpc_request_free(req);
1168 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1169 ptlrpc_at_set_req_timeout(req);
1170 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1172 req->rq_no_retry_einprogress = 1;
1174 desc = ptlrpc_prep_bulk_imp(req, page_count,
1175 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1176 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1177 PTLRPC_BULK_PUT_SINK) |
1178 PTLRPC_BULK_BUF_KIOV,
1180 &ptlrpc_bulk_kiov_pin_ops);
1183 GOTO(out, rc = -ENOMEM);
1184 /* NB request now owns desc and will free it when it gets freed */
1186 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1187 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1188 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1189 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1191 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1193 obdo_to_ioobj(oa, ioobj);
1194 ioobj->ioo_bufcnt = niocount;
1195 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1196 * that might be send for this request. The actual number is decided
1197 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1198 * "max - 1" for old client compatibility sending "0", and also so the
1199 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1200 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1201 LASSERT(page_count > 0);
1203 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1204 struct brw_page *pg = pga[i];
1205 int poff = pg->off & ~PAGE_MASK;
1207 LASSERT(pg->count > 0);
1208 /* make sure there is no gap in the middle of page array */
1209 LASSERTF(page_count == 1 ||
1210 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1211 ergo(i > 0 && i < page_count - 1,
1212 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1213 ergo(i == page_count - 1, poff == 0)),
1214 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1215 i, page_count, pg, pg->off, pg->count);
1216 LASSERTF(i == 0 || pg->off > pg_prev->off,
1217 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1218 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1220 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1221 pg_prev->pg, page_private(pg_prev->pg),
1222 pg_prev->pg->index, pg_prev->off);
1223 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1224 (pg->flag & OBD_BRW_SRVLOCK));
1226 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1227 requested_nob += pg->count;
1229 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1231 niobuf->rnb_len += pg->count;
1233 niobuf->rnb_offset = pg->off;
1234 niobuf->rnb_len = pg->count;
1235 niobuf->rnb_flags = pg->flag;
1240 LASSERTF((void *)(niobuf - niocount) ==
1241 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1242 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1243 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1245 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1247 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1248 body->oa.o_valid |= OBD_MD_FLFLAGS;
1249 body->oa.o_flags = 0;
1251 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1254 if (osc_should_shrink_grant(cli))
1255 osc_shrink_grant_local(cli, &body->oa);
1257 /* size[REQ_REC_OFF] still sizeof (*body) */
1258 if (opc == OST_WRITE) {
1259 if (cli->cl_checksum &&
1260 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1261 /* store cl_cksum_type in a local variable since
1262 * it can be changed via lprocfs */
1263 cksum_type_t cksum_type = cli->cl_cksum_type;
1265 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266 oa->o_flags &= OBD_FL_LOCAL_MASK;
1267 body->oa.o_flags = 0;
1269 body->oa.o_flags |= cksum_type_pack(cksum_type);
1270 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1275 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1277 /* save this in 'oa', too, for later checking */
1278 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1279 oa->o_flags |= cksum_type_pack(cksum_type);
1281 /* clear out the checksum flag, in case this is a
1282 * resend but cl_checksum is no longer set. b=11238 */
1283 oa->o_valid &= ~OBD_MD_FLCKSUM;
1285 oa->o_cksum = body->oa.o_cksum;
1286 /* 1 RC per niobuf */
1287 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1288 sizeof(__u32) * niocount);
1290 if (cli->cl_checksum &&
1291 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293 body->oa.o_flags = 0;
1294 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1298 ptlrpc_request_set_replen(req);
1300 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1301 aa = ptlrpc_req_async_args(req);
1303 aa->aa_requested_nob = requested_nob;
1304 aa->aa_nio_count = niocount;
1305 aa->aa_page_count = page_count;
1309 INIT_LIST_HEAD(&aa->aa_oaps);
1312 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1313 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1314 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1315 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1319 ptlrpc_req_finished(req);
1323 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1324 __u32 client_cksum, __u32 server_cksum, int nob,
1325 size_t page_count, struct brw_page **pga,
1326 cksum_type_t client_cksum_type)
1330 cksum_type_t cksum_type;
1332 if (server_cksum == client_cksum) {
1333 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1337 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1339 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1342 if (cksum_type != client_cksum_type)
1343 msg = "the server did not use the checksum type specified in "
1344 "the original request - likely a protocol problem";
1345 else if (new_cksum == server_cksum)
1346 msg = "changed on the client after we checksummed it - "
1347 "likely false positive due to mmap IO (bug 11742)";
1348 else if (new_cksum == client_cksum)
1349 msg = "changed in transit before arrival at OST";
1351 msg = "changed in transit AND doesn't match the original - "
1352 "likely false positive due to mmap IO (bug 11742)";
1354 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1355 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1356 msg, libcfs_nid2str(peer->nid),
1357 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1358 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1359 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1360 POSTID(&oa->o_oi), pga[0]->off,
1361 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1362 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1363 "client csum now %x\n", client_cksum, client_cksum_type,
1364 server_cksum, cksum_type, new_cksum);
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1371 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1372 const lnet_process_id_t *peer =
1373 &req->rq_import->imp_connection->c_peer;
1374 struct client_obd *cli = aa->aa_cli;
1375 struct ost_body *body;
1376 u32 client_cksum = 0;
1379 if (rc < 0 && rc != -EDQUOT) {
1380 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1384 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1385 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1387 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1391 /* set/clear over quota flag for a uid/gid */
1392 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1393 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1394 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1396 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1397 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1399 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1402 osc_update_grant(cli, body);
1407 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1408 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1410 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1412 CERROR("Unexpected +ve rc %d\n", rc);
1415 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1417 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1420 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1421 check_write_checksum(&body->oa, peer, client_cksum,
1422 body->oa.o_cksum, aa->aa_requested_nob,
1423 aa->aa_page_count, aa->aa_ppga,
1424 cksum_type_unpack(aa->aa_oa->o_flags)))
1427 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1428 aa->aa_page_count, aa->aa_ppga);
1432 /* The rest of this function executes only for OST_READs */
1434 /* if unwrap_bulk failed, return -EAGAIN to retry */
1435 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1437 GOTO(out, rc = -EAGAIN);
1439 if (rc > aa->aa_requested_nob) {
1440 CERROR("Unexpected rc %d (%d requested)\n", rc,
1441 aa->aa_requested_nob);
1445 if (rc != req->rq_bulk->bd_nob_transferred) {
1446 CERROR ("Unexpected rc %d (%d transferred)\n",
1447 rc, req->rq_bulk->bd_nob_transferred);
1451 if (rc < aa->aa_requested_nob)
1452 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1454 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1455 static int cksum_counter;
1456 u32 server_cksum = body->oa.o_cksum;
1459 cksum_type_t cksum_type;
1461 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1462 body->oa.o_flags : 0);
1463 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1464 aa->aa_ppga, OST_READ,
1467 if (peer->nid != req->rq_bulk->bd_sender) {
1469 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1472 if (server_cksum != client_cksum) {
1473 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1474 "%s%s%s inode "DFID" object "DOSTID
1475 " extent ["LPU64"-"LPU64"]\n",
1476 req->rq_import->imp_obd->obd_name,
1477 libcfs_nid2str(peer->nid),
1479 body->oa.o_valid & OBD_MD_FLFID ?
1480 body->oa.o_parent_seq : (__u64)0,
1481 body->oa.o_valid & OBD_MD_FLFID ?
1482 body->oa.o_parent_oid : 0,
1483 body->oa.o_valid & OBD_MD_FLFID ?
1484 body->oa.o_parent_ver : 0,
1485 POSTID(&body->oa.o_oi),
1486 aa->aa_ppga[0]->off,
1487 aa->aa_ppga[aa->aa_page_count-1]->off +
1488 aa->aa_ppga[aa->aa_page_count-1]->count -
1490 CERROR("client %x, server %x, cksum_type %x\n",
1491 client_cksum, server_cksum, cksum_type);
1493 aa->aa_oa->o_cksum = client_cksum;
1497 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1500 } else if (unlikely(client_cksum)) {
1501 static int cksum_missed;
1504 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1505 CERROR("Checksum %u requested from %s but not sent\n",
1506 cksum_missed, libcfs_nid2str(peer->nid));
1512 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1513 aa->aa_oa, &body->oa);
1518 static int osc_brw_redo_request(struct ptlrpc_request *request,
1519 struct osc_brw_async_args *aa, int rc)
1521 struct ptlrpc_request *new_req;
1522 struct osc_brw_async_args *new_aa;
1523 struct osc_async_page *oap;
1526 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1527 "redo for recoverable error %d", rc);
1529 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1530 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1531 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1532 aa->aa_ppga, &new_req, 1);
1536 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1537 if (oap->oap_request != NULL) {
1538 LASSERTF(request == oap->oap_request,
1539 "request %p != oap_request %p\n",
1540 request, oap->oap_request);
1541 if (oap->oap_interrupted) {
1542 ptlrpc_req_finished(new_req);
1547 /* New request takes over pga and oaps from old request.
1548 * Note that copying a list_head doesn't work, need to move it... */
1550 new_req->rq_interpret_reply = request->rq_interpret_reply;
1551 new_req->rq_async_args = request->rq_async_args;
1552 new_req->rq_commit_cb = request->rq_commit_cb;
1553 /* cap resend delay to the current request timeout, this is similar to
1554 * what ptlrpc does (see after_reply()) */
1555 if (aa->aa_resends > new_req->rq_timeout)
1556 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1558 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1559 new_req->rq_generation_set = 1;
1560 new_req->rq_import_generation = request->rq_import_generation;
1562 new_aa = ptlrpc_req_async_args(new_req);
1564 INIT_LIST_HEAD(&new_aa->aa_oaps);
1565 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1566 INIT_LIST_HEAD(&new_aa->aa_exts);
1567 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1568 new_aa->aa_resends = aa->aa_resends;
1570 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1571 if (oap->oap_request) {
1572 ptlrpc_req_finished(oap->oap_request);
1573 oap->oap_request = ptlrpc_request_addref(new_req);
1577 /* XXX: This code will run into problem if we're going to support
1578 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1579 * and wait for all of them to be finished. We should inherit request
1580 * set from old request. */
1581 ptlrpcd_add_req(new_req);
1583 DEBUG_REQ(D_INFO, new_req, "new request");
1588 * ugh, we want disk allocation on the target to happen in offset order. we'll
1589 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1590 * fine for our small page arrays and doesn't require allocation. its an
1591 * insertion sort that swaps elements that are strides apart, shrinking the
1592 * stride down until its '1' and the array is sorted.
1594 static void sort_brw_pages(struct brw_page **array, int num)
1597 struct brw_page *tmp;
1601 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1606 for (i = stride ; i < num ; i++) {
1609 while (j >= stride && array[j - stride]->off > tmp->off) {
1610 array[j] = array[j - stride];
1615 } while (stride > 1);
1618 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1620 LASSERT(ppga != NULL);
1621 OBD_FREE(ppga, sizeof(*ppga) * count);
1624 static int brw_interpret(const struct lu_env *env,
1625 struct ptlrpc_request *req, void *data, int rc)
1627 struct osc_brw_async_args *aa = data;
1628 struct osc_extent *ext;
1629 struct osc_extent *tmp;
1630 struct client_obd *cli = aa->aa_cli;
1633 rc = osc_brw_fini_request(req, rc);
1634 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1635 /* When server return -EINPROGRESS, client should always retry
1636 * regardless of the number of times the bulk was resent already. */
1637 if (osc_recoverable_error(rc)) {
1638 if (req->rq_import_generation !=
1639 req->rq_import->imp_generation) {
1640 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1641 ""DOSTID", rc = %d.\n",
1642 req->rq_import->imp_obd->obd_name,
1643 POSTID(&aa->aa_oa->o_oi), rc);
1644 } else if (rc == -EINPROGRESS ||
1645 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1646 rc = osc_brw_redo_request(req, aa, rc);
1648 CERROR("%s: too many resent retries for object: "
1649 ""LPU64":"LPU64", rc = %d.\n",
1650 req->rq_import->imp_obd->obd_name,
1651 POSTID(&aa->aa_oa->o_oi), rc);
1656 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1661 struct obdo *oa = aa->aa_oa;
1662 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1663 unsigned long valid = 0;
1664 struct cl_object *obj;
1665 struct osc_async_page *last;
1667 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1668 obj = osc2cl(last->oap_obj);
1670 cl_object_attr_lock(obj);
1671 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1672 attr->cat_blocks = oa->o_blocks;
1673 valid |= CAT_BLOCKS;
1675 if (oa->o_valid & OBD_MD_FLMTIME) {
1676 attr->cat_mtime = oa->o_mtime;
1679 if (oa->o_valid & OBD_MD_FLATIME) {
1680 attr->cat_atime = oa->o_atime;
1683 if (oa->o_valid & OBD_MD_FLCTIME) {
1684 attr->cat_ctime = oa->o_ctime;
1688 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1689 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1690 loff_t last_off = last->oap_count + last->oap_obj_off +
1693 /* Change file size if this is an out of quota or
1694 * direct IO write and it extends the file size */
1695 if (loi->loi_lvb.lvb_size < last_off) {
1696 attr->cat_size = last_off;
1699 /* Extend KMS if it's not a lockless write */
1700 if (loi->loi_kms < last_off &&
1701 oap2osc_page(last)->ops_srvlock == 0) {
1702 attr->cat_kms = last_off;
1708 cl_object_attr_update(env, obj, attr, valid);
1709 cl_object_attr_unlock(obj);
1711 OBDO_FREE(aa->aa_oa);
1713 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1714 osc_inc_unstable_pages(req);
1716 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1717 list_del_init(&ext->oe_link);
1718 osc_extent_finish(env, ext, 1, rc);
1720 LASSERT(list_empty(&aa->aa_exts));
1721 LASSERT(list_empty(&aa->aa_oaps));
1723 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1724 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1726 spin_lock(&cli->cl_loi_list_lock);
1727 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1728 * is called so we know whether to go to sync BRWs or wait for more
1729 * RPCs to complete */
1730 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1731 cli->cl_w_in_flight--;
1733 cli->cl_r_in_flight--;
1734 osc_wake_cache_waiters(cli);
1735 spin_unlock(&cli->cl_loi_list_lock);
1737 osc_io_unplug(env, cli, NULL);
1741 static void brw_commit(struct ptlrpc_request *req)
1743 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1744 * this called via the rq_commit_cb, I need to ensure
1745 * osc_dec_unstable_pages is still called. Otherwise unstable
1746 * pages may be leaked. */
1747 spin_lock(&req->rq_lock);
1748 if (likely(req->rq_unstable)) {
1749 req->rq_unstable = 0;
1750 spin_unlock(&req->rq_lock);
1752 osc_dec_unstable_pages(req);
1754 req->rq_committed = 1;
1755 spin_unlock(&req->rq_lock);
1760 * Build an RPC by the list of extent @ext_list. The caller must ensure
1761 * that the total pages in this list are NOT over max pages per RPC.
1762 * Extents in the list must be in OES_RPC state.
1764 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1765 struct list_head *ext_list, int cmd)
1767 struct ptlrpc_request *req = NULL;
1768 struct osc_extent *ext;
1769 struct brw_page **pga = NULL;
1770 struct osc_brw_async_args *aa = NULL;
1771 struct obdo *oa = NULL;
1772 struct osc_async_page *oap;
1773 struct osc_object *obj = NULL;
1774 struct cl_req_attr *crattr = NULL;
1775 loff_t starting_offset = OBD_OBJECT_EOF;
1776 loff_t ending_offset = 0;
1780 bool soft_sync = false;
1781 bool interrupted = false;
1785 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1786 struct ost_body *body;
1788 LASSERT(!list_empty(ext_list));
1790 /* add pages into rpc_list to build BRW rpc */
1791 list_for_each_entry(ext, ext_list, oe_link) {
1792 LASSERT(ext->oe_state == OES_RPC);
1793 mem_tight |= ext->oe_memalloc;
1794 grant += ext->oe_grants;
1795 page_count += ext->oe_nr_pages;
1800 soft_sync = osc_over_unstable_soft_limit(cli);
1802 mpflag = cfs_memory_pressure_get_and_set();
1804 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1806 GOTO(out, rc = -ENOMEM);
1810 GOTO(out, rc = -ENOMEM);
1813 list_for_each_entry(ext, ext_list, oe_link) {
1814 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1816 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1818 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1819 pga[i] = &oap->oap_brw_page;
1820 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1823 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1824 if (starting_offset == OBD_OBJECT_EOF ||
1825 starting_offset > oap->oap_obj_off)
1826 starting_offset = oap->oap_obj_off;
1828 LASSERT(oap->oap_page_off == 0);
1829 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1830 ending_offset = oap->oap_obj_off +
1833 LASSERT(oap->oap_page_off + oap->oap_count ==
1835 if (oap->oap_interrupted)
1840 /* first page in the list */
1841 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1843 crattr = &osc_env_info(env)->oti_req_attr;
1844 memset(crattr, 0, sizeof(*crattr));
1845 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1846 crattr->cra_flags = ~0ULL;
1847 crattr->cra_page = oap2cl_page(oap);
1848 crattr->cra_oa = oa;
1849 cl_req_attr_set(env, osc2cl(obj), crattr);
1851 if (cmd == OBD_BRW_WRITE)
1852 oa->o_grant_used = grant;
1854 sort_brw_pages(pga, page_count);
1855 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1857 CERROR("prep_req failed: %d\n", rc);
1861 req->rq_commit_cb = brw_commit;
1862 req->rq_interpret_reply = brw_interpret;
1863 req->rq_memalloc = mem_tight != 0;
1864 oap->oap_request = ptlrpc_request_addref(req);
1865 if (interrupted && !req->rq_intr)
1866 ptlrpc_mark_interrupted(req);
1868 /* Need to update the timestamps after the request is built in case
1869 * we race with setattr (locally or in queue at OST). If OST gets
1870 * later setattr before earlier BRW (as determined by the request xid),
1871 * the OST will not use BRW timestamps. Sadly, there is no obvious
1872 * way to do this in a single call. bug 10150 */
1873 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1874 crattr->cra_oa = &body->oa;
1875 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1876 cl_req_attr_set(env, osc2cl(obj), crattr);
1877 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1879 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1880 aa = ptlrpc_req_async_args(req);
1881 INIT_LIST_HEAD(&aa->aa_oaps);
1882 list_splice_init(&rpc_list, &aa->aa_oaps);
1883 INIT_LIST_HEAD(&aa->aa_exts);
1884 list_splice_init(ext_list, &aa->aa_exts);
1886 spin_lock(&cli->cl_loi_list_lock);
1887 starting_offset >>= PAGE_CACHE_SHIFT;
1888 if (cmd == OBD_BRW_READ) {
1889 cli->cl_r_in_flight++;
1890 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1891 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1892 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1893 starting_offset + 1);
1895 cli->cl_w_in_flight++;
1896 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1897 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1898 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1899 starting_offset + 1);
1901 spin_unlock(&cli->cl_loi_list_lock);
1903 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1904 page_count, aa, cli->cl_r_in_flight,
1905 cli->cl_w_in_flight);
1907 ptlrpcd_add_req(req);
1913 cfs_memory_pressure_restore(mpflag);
1916 LASSERT(req == NULL);
1921 OBD_FREE(pga, sizeof(*pga) * page_count);
1922 /* this should happen rarely and is pretty bad, it makes the
1923 * pending list not follow the dirty order */
1924 while (!list_empty(ext_list)) {
1925 ext = list_entry(ext_list->next, struct osc_extent,
1927 list_del_init(&ext->oe_link);
1928 osc_extent_finish(env, ext, 0, rc);
1934 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1935 struct ldlm_enqueue_info *einfo)
1937 void *data = einfo->ei_cbdata;
1940 LASSERT(lock != NULL);
1941 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1942 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1943 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1944 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1946 lock_res_and_lock(lock);
1948 if (lock->l_ast_data == NULL)
1949 lock->l_ast_data = data;
1950 if (lock->l_ast_data == data)
1953 unlock_res_and_lock(lock);
1958 static int osc_set_data_with_check(struct lustre_handle *lockh,
1959 struct ldlm_enqueue_info *einfo)
1961 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1965 set = osc_set_lock_data_with_check(lock, einfo);
1966 LDLM_LOCK_PUT(lock);
1968 CERROR("lockh %p, data %p - client evicted?\n",
1969 lockh, einfo->ei_cbdata);
1973 static int osc_enqueue_fini(struct ptlrpc_request *req,
1974 osc_enqueue_upcall_f upcall, void *cookie,
1975 struct lustre_handle *lockh, enum ldlm_mode mode,
1976 __u64 *flags, int agl, int errcode)
1978 bool intent = *flags & LDLM_FL_HAS_INTENT;
1982 /* The request was created before ldlm_cli_enqueue call. */
1983 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1984 struct ldlm_reply *rep;
1986 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1987 LASSERT(rep != NULL);
1989 rep->lock_policy_res1 =
1990 ptlrpc_status_ntoh(rep->lock_policy_res1);
1991 if (rep->lock_policy_res1)
1992 errcode = rep->lock_policy_res1;
1994 *flags |= LDLM_FL_LVB_READY;
1995 } else if (errcode == ELDLM_OK) {
1996 *flags |= LDLM_FL_LVB_READY;
1999 /* Call the update callback. */
2000 rc = (*upcall)(cookie, lockh, errcode);
2002 /* release the reference taken in ldlm_cli_enqueue() */
2003 if (errcode == ELDLM_LOCK_MATCHED)
2005 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2006 ldlm_lock_decref(lockh, mode);
2011 static int osc_enqueue_interpret(const struct lu_env *env,
2012 struct ptlrpc_request *req,
2013 struct osc_enqueue_args *aa, int rc)
2015 struct ldlm_lock *lock;
2016 struct lustre_handle *lockh = &aa->oa_lockh;
2017 enum ldlm_mode mode = aa->oa_mode;
2018 struct ost_lvb *lvb = aa->oa_lvb;
2019 __u32 lvb_len = sizeof(*lvb);
2024 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2026 lock = ldlm_handle2lock(lockh);
2027 LASSERTF(lock != NULL,
2028 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2029 lockh->cookie, req, aa);
2031 /* Take an additional reference so that a blocking AST that
2032 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2033 * to arrive after an upcall has been executed by
2034 * osc_enqueue_fini(). */
2035 ldlm_lock_addref(lockh, mode);
2037 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2038 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2040 /* Let CP AST to grant the lock first. */
2041 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2044 LASSERT(aa->oa_lvb == NULL);
2045 LASSERT(aa->oa_flags == NULL);
2046 aa->oa_flags = &flags;
2049 /* Complete obtaining the lock procedure. */
2050 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2051 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2053 /* Complete osc stuff. */
2054 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2055 aa->oa_flags, aa->oa_agl, rc);
2057 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2059 ldlm_lock_decref(lockh, mode);
2060 LDLM_LOCK_PUT(lock);
2064 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2066 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2067 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2068 * other synchronous requests, however keeping some locks and trying to obtain
2069 * others may take a considerable amount of time in a case of ost failure; and
2070 * when other sync requests do not get released lock from a client, the client
2071 * is evicted from the cluster -- such scenarious make the life difficult, so
2072 * release locks just after they are obtained. */
2073 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2074 __u64 *flags, union ldlm_policy_data *policy,
2075 struct ost_lvb *lvb, int kms_valid,
2076 osc_enqueue_upcall_f upcall, void *cookie,
2077 struct ldlm_enqueue_info *einfo,
2078 struct ptlrpc_request_set *rqset, int async, int agl)
2080 struct obd_device *obd = exp->exp_obd;
2081 struct lustre_handle lockh = { 0 };
2082 struct ptlrpc_request *req = NULL;
2083 int intent = *flags & LDLM_FL_HAS_INTENT;
2084 __u64 match_flags = *flags;
2085 enum ldlm_mode mode;
2089 /* Filesystem lock extents are extended to page boundaries so that
2090 * dealing with the page cache is a little smoother. */
2091 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2092 policy->l_extent.end |= ~PAGE_MASK;
2095 * kms is not valid when either object is completely fresh (so that no
2096 * locks are cached), or object was evicted. In the latter case cached
2097 * lock cannot be used, because it would prime inode state with
2098 * potentially stale LVB.
2103 /* Next, search for already existing extent locks that will cover us */
2104 /* If we're trying to read, we also search for an existing PW lock. The
2105 * VFS and page cache already protect us locally, so lots of readers/
2106 * writers can share a single PW lock.
2108 * There are problems with conversion deadlocks, so instead of
2109 * converting a read lock to a write lock, we'll just enqueue a new
2112 * At some point we should cancel the read lock instead of making them
2113 * send us a blocking callback, but there are problems with canceling
2114 * locks out from other users right now, too. */
2115 mode = einfo->ei_mode;
2116 if (einfo->ei_mode == LCK_PR)
2119 match_flags |= LDLM_FL_LVB_READY;
2121 match_flags |= LDLM_FL_BLOCK_GRANTED;
2122 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2123 einfo->ei_type, policy, mode, &lockh, 0);
2125 struct ldlm_lock *matched;
2127 if (*flags & LDLM_FL_TEST_LOCK)
2130 matched = ldlm_handle2lock(&lockh);
2132 /* AGL enqueues DLM locks speculatively. Therefore if
2133 * it already exists a DLM lock, it wll just inform the
2134 * caller to cancel the AGL process for this stripe. */
2135 ldlm_lock_decref(&lockh, mode);
2136 LDLM_LOCK_PUT(matched);
2138 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2139 *flags |= LDLM_FL_LVB_READY;
2141 /* We already have a lock, and it's referenced. */
2142 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2144 ldlm_lock_decref(&lockh, mode);
2145 LDLM_LOCK_PUT(matched);
2148 ldlm_lock_decref(&lockh, mode);
2149 LDLM_LOCK_PUT(matched);
2154 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2158 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2159 &RQF_LDLM_ENQUEUE_LVB);
2163 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2165 ptlrpc_request_free(req);
2169 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2171 ptlrpc_request_set_replen(req);
2174 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2175 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2177 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2178 sizeof(*lvb), LVB_T_OST, &lockh, async);
2181 struct osc_enqueue_args *aa;
2182 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2183 aa = ptlrpc_req_async_args(req);
2185 aa->oa_mode = einfo->ei_mode;
2186 aa->oa_type = einfo->ei_type;
2187 lustre_handle_copy(&aa->oa_lockh, &lockh);
2188 aa->oa_upcall = upcall;
2189 aa->oa_cookie = cookie;
2192 aa->oa_flags = flags;
2195 /* AGL is essentially to enqueue an DLM lock
2196 * in advance, so we don't care about the
2197 * result of AGL enqueue. */
2199 aa->oa_flags = NULL;
2202 req->rq_interpret_reply =
2203 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2204 if (rqset == PTLRPCD_SET)
2205 ptlrpcd_add_req(req);
2207 ptlrpc_set_add_req(rqset, req);
2208 } else if (intent) {
2209 ptlrpc_req_finished(req);
2214 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2217 ptlrpc_req_finished(req);
2222 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2223 enum ldlm_type type, union ldlm_policy_data *policy,
2224 enum ldlm_mode mode, __u64 *flags, void *data,
2225 struct lustre_handle *lockh, int unref)
2227 struct obd_device *obd = exp->exp_obd;
2228 __u64 lflags = *flags;
2232 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2235 /* Filesystem lock extents are extended to page boundaries so that
2236 * dealing with the page cache is a little smoother */
2237 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2238 policy->l_extent.end |= ~PAGE_MASK;
2240 /* Next, search for already existing extent locks that will cover us */
2241 /* If we're trying to read, we also search for an existing PW lock. The
2242 * VFS and page cache already protect us locally, so lots of readers/
2243 * writers can share a single PW lock. */
2247 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2248 res_id, type, policy, rc, lockh, unref);
2251 if (!osc_set_data_with_check(lockh, data)) {
2252 if (!(lflags & LDLM_FL_TEST_LOCK))
2253 ldlm_lock_decref(lockh, rc);
2257 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2258 ldlm_lock_addref(lockh, LCK_PR);
2259 ldlm_lock_decref(lockh, LCK_PW);
2266 static int osc_statfs_interpret(const struct lu_env *env,
2267 struct ptlrpc_request *req,
2268 struct osc_async_args *aa, int rc)
2270 struct obd_statfs *msfs;
2274 /* The request has in fact never been sent
2275 * due to issues at a higher level (LOV).
2276 * Exit immediately since the caller is
2277 * aware of the problem and takes care
2278 * of the clean up */
2281 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2282 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2288 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2290 GOTO(out, rc = -EPROTO);
2293 *aa->aa_oi->oi_osfs = *msfs;
2295 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2299 static int osc_statfs_async(struct obd_export *exp,
2300 struct obd_info *oinfo, __u64 max_age,
2301 struct ptlrpc_request_set *rqset)
2303 struct obd_device *obd = class_exp2obd(exp);
2304 struct ptlrpc_request *req;
2305 struct osc_async_args *aa;
2309 /* We could possibly pass max_age in the request (as an absolute
2310 * timestamp or a "seconds.usec ago") so the target can avoid doing
2311 * extra calls into the filesystem if that isn't necessary (e.g.
2312 * during mount that would help a bit). Having relative timestamps
2313 * is not so great if request processing is slow, while absolute
2314 * timestamps are not ideal because they need time synchronization. */
2315 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2319 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2321 ptlrpc_request_free(req);
2324 ptlrpc_request_set_replen(req);
2325 req->rq_request_portal = OST_CREATE_PORTAL;
2326 ptlrpc_at_set_req_timeout(req);
2328 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2329 /* procfs requests not want stat in wait for avoid deadlock */
2330 req->rq_no_resend = 1;
2331 req->rq_no_delay = 1;
2334 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2335 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2336 aa = ptlrpc_req_async_args(req);
2339 ptlrpc_set_add_req(rqset, req);
2343 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2344 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2346 struct obd_device *obd = class_exp2obd(exp);
2347 struct obd_statfs *msfs;
2348 struct ptlrpc_request *req;
2349 struct obd_import *imp = NULL;
2353 /*Since the request might also come from lprocfs, so we need
2354 *sync this with client_disconnect_export Bug15684*/
2355 down_read(&obd->u.cli.cl_sem);
2356 if (obd->u.cli.cl_import)
2357 imp = class_import_get(obd->u.cli.cl_import);
2358 up_read(&obd->u.cli.cl_sem);
2362 /* We could possibly pass max_age in the request (as an absolute
2363 * timestamp or a "seconds.usec ago") so the target can avoid doing
2364 * extra calls into the filesystem if that isn't necessary (e.g.
2365 * during mount that would help a bit). Having relative timestamps
2366 * is not so great if request processing is slow, while absolute
2367 * timestamps are not ideal because they need time synchronization. */
2368 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2370 class_import_put(imp);
2375 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2377 ptlrpc_request_free(req);
2380 ptlrpc_request_set_replen(req);
2381 req->rq_request_portal = OST_CREATE_PORTAL;
2382 ptlrpc_at_set_req_timeout(req);
2384 if (flags & OBD_STATFS_NODELAY) {
2385 /* procfs requests not want stat in wait for avoid deadlock */
2386 req->rq_no_resend = 1;
2387 req->rq_no_delay = 1;
2390 rc = ptlrpc_queue_wait(req);
2394 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2396 GOTO(out, rc = -EPROTO);
2403 ptlrpc_req_finished(req);
2407 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2408 void *karg, void __user *uarg)
2410 struct obd_device *obd = exp->exp_obd;
2411 struct obd_ioctl_data *data = karg;
2415 if (!try_module_get(THIS_MODULE)) {
2416 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2417 module_name(THIS_MODULE));
2421 case OBD_IOC_CLIENT_RECOVER:
2422 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2423 data->ioc_inlbuf1, 0);
2427 case IOC_OSC_SET_ACTIVE:
2428 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2431 case OBD_IOC_PING_TARGET:
2432 err = ptlrpc_obd_ping(obd);
2435 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2436 cmd, current_comm());
2437 GOTO(out, err = -ENOTTY);
2440 module_put(THIS_MODULE);
2444 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2445 u32 keylen, void *key,
2446 u32 vallen, void *val,
2447 struct ptlrpc_request_set *set)
2449 struct ptlrpc_request *req;
2450 struct obd_device *obd = exp->exp_obd;
2451 struct obd_import *imp = class_exp2cliimp(exp);
2456 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2458 if (KEY_IS(KEY_CHECKSUM)) {
2459 if (vallen != sizeof(int))
2461 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2465 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2466 sptlrpc_conf_client_adapt(obd);
2470 if (KEY_IS(KEY_FLUSH_CTX)) {
2471 sptlrpc_import_flush_my_ctx(imp);
2475 if (KEY_IS(KEY_CACHE_SET)) {
2476 struct client_obd *cli = &obd->u.cli;
2478 LASSERT(cli->cl_cache == NULL); /* only once */
2479 cli->cl_cache = (struct cl_client_cache *)val;
2480 cl_cache_incref(cli->cl_cache);
2481 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2483 /* add this osc into entity list */
2484 LASSERT(list_empty(&cli->cl_lru_osc));
2485 spin_lock(&cli->cl_cache->ccc_lru_lock);
2486 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2487 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2492 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2493 struct client_obd *cli = &obd->u.cli;
2494 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2495 long target = *(long *)val;
2497 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2502 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2505 /* We pass all other commands directly to OST. Since nobody calls osc
2506 methods directly and everybody is supposed to go through LOV, we
2507 assume lov checked invalid values for us.
2508 The only recognised values so far are evict_by_nid and mds_conn.
2509 Even if something bad goes through, we'd get a -EINVAL from OST
2512 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2513 &RQF_OST_SET_GRANT_INFO :
2518 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2519 RCL_CLIENT, keylen);
2520 if (!KEY_IS(KEY_GRANT_SHRINK))
2521 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2522 RCL_CLIENT, vallen);
2523 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2525 ptlrpc_request_free(req);
2529 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2530 memcpy(tmp, key, keylen);
2531 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2534 memcpy(tmp, val, vallen);
2536 if (KEY_IS(KEY_GRANT_SHRINK)) {
2537 struct osc_grant_args *aa;
2540 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2541 aa = ptlrpc_req_async_args(req);
2544 ptlrpc_req_finished(req);
2547 *oa = ((struct ost_body *)val)->oa;
2549 req->rq_interpret_reply = osc_shrink_grant_interpret;
2552 ptlrpc_request_set_replen(req);
2553 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2554 LASSERT(set != NULL);
2555 ptlrpc_set_add_req(set, req);
2556 ptlrpc_check_set(NULL, set);
2558 ptlrpcd_add_req(req);
2564 static int osc_reconnect(const struct lu_env *env,
2565 struct obd_export *exp, struct obd_device *obd,
2566 struct obd_uuid *cluuid,
2567 struct obd_connect_data *data,
2570 struct client_obd *cli = &obd->u.cli;
2572 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2576 spin_lock(&cli->cl_loi_list_lock);
2577 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2578 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2579 grant += cli->cl_dirty_grant;
2581 grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2582 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2583 lost_grant = cli->cl_lost_grant;
2584 cli->cl_lost_grant = 0;
2585 spin_unlock(&cli->cl_loi_list_lock);
2587 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2588 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2589 data->ocd_version, data->ocd_grant, lost_grant);
2595 static int osc_disconnect(struct obd_export *exp)
2597 struct obd_device *obd = class_exp2obd(exp);
2600 rc = client_disconnect_export(exp);
2602 * Initially we put del_shrink_grant before disconnect_export, but it
2603 * causes the following problem if setup (connect) and cleanup
2604 * (disconnect) are tangled together.
2605 * connect p1 disconnect p2
2606 * ptlrpc_connect_import
2607 * ............... class_manual_cleanup
2610 * ptlrpc_connect_interrupt
2612 * add this client to shrink list
2614 * Bang! pinger trigger the shrink.
2615 * So the osc should be disconnected from the shrink list, after we
2616 * are sure the import has been destroyed. BUG18662
2618 if (obd->u.cli.cl_import == NULL)
2619 osc_del_shrink_grant(&obd->u.cli);
2623 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2624 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2626 struct lu_env *env = arg;
2627 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2628 struct ldlm_lock *lock;
2629 struct osc_object *osc = NULL;
2633 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2634 if (lock->l_ast_data != NULL && osc == NULL) {
2635 osc = lock->l_ast_data;
2636 cl_object_get(osc2cl(osc));
2639 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2640 * by the 2nd round of ldlm_namespace_clean() call in
2641 * osc_import_event(). */
2642 ldlm_clear_cleaned(lock);
2647 osc_object_invalidate(env, osc);
2648 cl_object_put(env, osc2cl(osc));
2654 static int osc_import_event(struct obd_device *obd,
2655 struct obd_import *imp,
2656 enum obd_import_event event)
2658 struct client_obd *cli;
2662 LASSERT(imp->imp_obd == obd);
2665 case IMP_EVENT_DISCON: {
2667 spin_lock(&cli->cl_loi_list_lock);
2668 cli->cl_avail_grant = 0;
2669 cli->cl_lost_grant = 0;
2670 spin_unlock(&cli->cl_loi_list_lock);
2673 case IMP_EVENT_INACTIVE: {
2674 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2677 case IMP_EVENT_INVALIDATE: {
2678 struct ldlm_namespace *ns = obd->obd_namespace;
2682 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2684 env = cl_env_get(&refcheck);
2686 osc_io_unplug(env, &obd->u.cli, NULL);
2688 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2689 osc_ldlm_resource_invalidate,
2691 cl_env_put(env, &refcheck);
2693 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2698 case IMP_EVENT_ACTIVE: {
2699 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2702 case IMP_EVENT_OCD: {
2703 struct obd_connect_data *ocd = &imp->imp_connect_data;
2705 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2706 osc_init_grant(&obd->u.cli, ocd);
2709 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2710 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2712 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2715 case IMP_EVENT_DEACTIVATE: {
2716 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2719 case IMP_EVENT_ACTIVATE: {
2720 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2724 CERROR("Unknown import event %d\n", event);
2731 * Determine whether the lock can be canceled before replaying the lock
2732 * during recovery, see bug16774 for detailed information.
2734 * \retval zero the lock can't be canceled
2735 * \retval other ok to cancel
2737 static int osc_cancel_weight(struct ldlm_lock *lock)
2740 * Cancel all unused and granted extent lock.
2742 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2743 lock->l_granted_mode == lock->l_req_mode &&
2744 osc_ldlm_weigh_ast(lock) == 0)
2750 static int brw_queue_work(const struct lu_env *env, void *data)
2752 struct client_obd *cli = data;
2754 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2756 osc_io_unplug(env, cli, NULL);
2760 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2762 struct client_obd *cli = &obd->u.cli;
2763 struct obd_type *type;
2771 rc = ptlrpcd_addref();
2775 rc = client_obd_setup(obd, lcfg);
2777 GOTO(out_ptlrpcd, rc);
2779 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2780 if (IS_ERR(handler))
2781 GOTO(out_client_setup, rc = PTR_ERR(handler));
2782 cli->cl_writeback_work = handler;
2784 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2785 if (IS_ERR(handler))
2786 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2787 cli->cl_lru_work = handler;
2789 rc = osc_quota_setup(obd);
2791 GOTO(out_ptlrpcd_work, rc);
2793 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2795 #ifdef CONFIG_PROC_FS
2796 obd->obd_vars = lprocfs_osc_obd_vars;
2798 /* If this is true then both client (osc) and server (osp) are on the
2799 * same node. The osp layer if loaded first will register the osc proc
2800 * directory. In that case this obd_device will be attached its proc
2801 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2802 type = class_search_type(LUSTRE_OSP_NAME);
2803 if (type && type->typ_procsym) {
2804 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2806 obd->obd_vars, obd);
2807 if (IS_ERR(obd->obd_proc_entry)) {
2808 rc = PTR_ERR(obd->obd_proc_entry);
2809 CERROR("error %d setting up lprocfs for %s\n", rc,
2811 obd->obd_proc_entry = NULL;
2814 rc = lprocfs_obd_setup(obd);
2817 /* If the basic OSC proc tree construction succeeded then
2818 * lets do the rest. */
2820 lproc_osc_attach_seqstat(obd);
2821 sptlrpc_lprocfs_cliobd_attach(obd);
2822 ptlrpc_lprocfs_register_obd(obd);
2826 * We try to control the total number of requests with a upper limit
2827 * osc_reqpool_maxreqcount. There might be some race which will cause
2828 * over-limit allocation, but it is fine.
2830 req_count = atomic_read(&osc_pool_req_count);
2831 if (req_count < osc_reqpool_maxreqcount) {
2832 adding = cli->cl_max_rpcs_in_flight + 2;
2833 if (req_count + adding > osc_reqpool_maxreqcount)
2834 adding = osc_reqpool_maxreqcount - req_count;
2836 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2837 atomic_add(added, &osc_pool_req_count);
2840 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2841 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2843 spin_lock(&osc_shrink_lock);
2844 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2845 spin_unlock(&osc_shrink_lock);
2850 if (cli->cl_writeback_work != NULL) {
2851 ptlrpcd_destroy_work(cli->cl_writeback_work);
2852 cli->cl_writeback_work = NULL;
2854 if (cli->cl_lru_work != NULL) {
2855 ptlrpcd_destroy_work(cli->cl_lru_work);
2856 cli->cl_lru_work = NULL;
2859 client_obd_cleanup(obd);
2865 static int osc_precleanup(struct obd_device *obd)
2867 struct client_obd *cli = &obd->u.cli;
2871 * for echo client, export may be on zombie list, wait for
2872 * zombie thread to cull it, because cli.cl_import will be
2873 * cleared in client_disconnect_export():
2874 * class_export_destroy() -> obd_cleanup() ->
2875 * echo_device_free() -> echo_client_cleanup() ->
2876 * obd_disconnect() -> osc_disconnect() ->
2877 * client_disconnect_export()
2879 obd_zombie_barrier();
2880 if (cli->cl_writeback_work) {
2881 ptlrpcd_destroy_work(cli->cl_writeback_work);
2882 cli->cl_writeback_work = NULL;
2885 if (cli->cl_lru_work) {
2886 ptlrpcd_destroy_work(cli->cl_lru_work);
2887 cli->cl_lru_work = NULL;
2890 obd_cleanup_client_import(obd);
2891 ptlrpc_lprocfs_unregister_obd(obd);
2892 lprocfs_obd_cleanup(obd);
2896 int osc_cleanup(struct obd_device *obd)
2898 struct client_obd *cli = &obd->u.cli;
2903 spin_lock(&osc_shrink_lock);
2904 list_del(&cli->cl_shrink_list);
2905 spin_unlock(&osc_shrink_lock);
2908 if (cli->cl_cache != NULL) {
2909 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2910 spin_lock(&cli->cl_cache->ccc_lru_lock);
2911 list_del_init(&cli->cl_lru_osc);
2912 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2913 cli->cl_lru_left = NULL;
2914 cl_cache_decref(cli->cl_cache);
2915 cli->cl_cache = NULL;
2918 /* free memory of osc quota cache */
2919 osc_quota_cleanup(obd);
2921 rc = client_obd_cleanup(obd);
2927 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2929 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2930 return rc > 0 ? 0: rc;
2933 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2935 return osc_process_config_base(obd, buf);
2938 static struct obd_ops osc_obd_ops = {
2939 .o_owner = THIS_MODULE,
2940 .o_setup = osc_setup,
2941 .o_precleanup = osc_precleanup,
2942 .o_cleanup = osc_cleanup,
2943 .o_add_conn = client_import_add_conn,
2944 .o_del_conn = client_import_del_conn,
2945 .o_connect = client_connect_import,
2946 .o_reconnect = osc_reconnect,
2947 .o_disconnect = osc_disconnect,
2948 .o_statfs = osc_statfs,
2949 .o_statfs_async = osc_statfs_async,
2950 .o_create = osc_create,
2951 .o_destroy = osc_destroy,
2952 .o_getattr = osc_getattr,
2953 .o_setattr = osc_setattr,
2954 .o_iocontrol = osc_iocontrol,
2955 .o_set_info_async = osc_set_info_async,
2956 .o_import_event = osc_import_event,
2957 .o_process_config = osc_process_config,
2958 .o_quotactl = osc_quotactl,
2961 static struct shrinker *osc_cache_shrinker;
2962 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2963 DEFINE_SPINLOCK(osc_shrink_lock);
2965 #ifndef HAVE_SHRINKER_COUNT
2966 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2968 struct shrink_control scv = {
2969 .nr_to_scan = shrink_param(sc, nr_to_scan),
2970 .gfp_mask = shrink_param(sc, gfp_mask)
2972 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2973 struct shrinker *shrinker = NULL;
2976 (void)osc_cache_shrink_scan(shrinker, &scv);
2978 return osc_cache_shrink_count(shrinker, &scv);
2982 static int __init osc_init(void)
2984 bool enable_proc = true;
2985 struct obd_type *type;
2986 unsigned int reqpool_size;
2987 unsigned int reqsize;
2989 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2990 osc_cache_shrink_count, osc_cache_shrink_scan);
2993 /* print an address of _any_ initialized kernel symbol from this
2994 * module, to allow debugging with gdb that doesn't support data
2995 * symbols from modules.*/
2996 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2998 rc = lu_kmem_init(osc_caches);
3002 type = class_search_type(LUSTRE_OSP_NAME);
3003 if (type != NULL && type->typ_procsym != NULL)
3004 enable_proc = false;
3006 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3007 LUSTRE_OSC_NAME, &osc_device_type);
3011 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3013 /* This is obviously too much memory, only prevent overflow here */
3014 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3015 GOTO(out_type, rc = -EINVAL);
3017 reqpool_size = osc_reqpool_mem_max << 20;
3020 while (reqsize < OST_IO_MAXREQSIZE)
3021 reqsize = reqsize << 1;
3024 * We don't enlarge the request count in OSC pool according to
3025 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3026 * tried after normal allocation failed. So a small OSC pool won't
3027 * cause much performance degression in most of cases.
3029 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3031 atomic_set(&osc_pool_req_count, 0);
3032 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3033 ptlrpc_add_rqs_to_pool);
3035 if (osc_rq_pool != NULL)
3039 class_unregister_type(LUSTRE_OSC_NAME);
3041 lu_kmem_fini(osc_caches);
3046 static void __exit osc_exit(void)
3048 remove_shrinker(osc_cache_shrinker);
3049 class_unregister_type(LUSTRE_OSC_NAME);
3050 lu_kmem_fini(osc_caches);
3051 ptlrpc_free_rq_pool(osc_rq_pool);
3054 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3055 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3056 MODULE_VERSION(LUSTRE_VERSION_STRING);
3057 MODULE_LICENSE("GPL");
3059 module_init(osc_init);
3060 module_exit(osc_exit);