4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <libcfs/libcfs.h>
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
51 #include "osc_internal.h"
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
61 #define osc_grant_args osc_brw_async_args
63 struct osc_setattr_args {
65 obd_enqueue_update_f sa_upcall;
69 struct osc_fsync_args {
70 struct osc_object *fa_obj;
72 obd_enqueue_update_f fa_upcall;
76 struct osc_ladvise_args {
78 obd_enqueue_update_f la_upcall;
82 struct osc_enqueue_args {
83 struct obd_export *oa_exp;
84 enum ldlm_type oa_type;
85 enum ldlm_mode oa_mode;
87 osc_enqueue_upcall_f oa_upcall;
89 struct ost_lvb *oa_lvb;
90 struct lustre_handle oa_lockh;
94 static void osc_release_ppga(struct brw_page **ppga, size_t count);
95 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
98 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
100 struct ost_body *body;
102 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
105 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
108 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
111 struct ptlrpc_request *req;
112 struct ost_body *body;
116 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
120 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
122 ptlrpc_request_free(req);
126 osc_pack_req_body(req, oa);
128 ptlrpc_request_set_replen(req);
130 rc = ptlrpc_queue_wait(req);
134 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
136 GOTO(out, rc = -EPROTO);
138 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
139 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
141 oa->o_blksize = cli_brw_size(exp->exp_obd);
142 oa->o_valid |= OBD_MD_FLBLKSZ;
146 ptlrpc_req_finished(req);
151 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
154 struct ptlrpc_request *req;
155 struct ost_body *body;
159 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
161 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
165 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
167 ptlrpc_request_free(req);
171 osc_pack_req_body(req, oa);
173 ptlrpc_request_set_replen(req);
175 rc = ptlrpc_queue_wait(req);
179 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
181 GOTO(out, rc = -EPROTO);
183 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
187 ptlrpc_req_finished(req);
192 static int osc_setattr_interpret(const struct lu_env *env,
193 struct ptlrpc_request *req,
194 struct osc_setattr_args *sa, int rc)
196 struct ost_body *body;
202 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
204 GOTO(out, rc = -EPROTO);
206 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
209 rc = sa->sa_upcall(sa->sa_cookie, rc);
213 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
214 obd_enqueue_update_f upcall, void *cookie,
215 struct ptlrpc_request_set *rqset)
217 struct ptlrpc_request *req;
218 struct osc_setattr_args *sa;
223 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
227 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
229 ptlrpc_request_free(req);
233 osc_pack_req_body(req, oa);
235 ptlrpc_request_set_replen(req);
237 /* do mds to ost setattr asynchronously */
239 /* Do not wait for response. */
240 ptlrpcd_add_req(req);
242 req->rq_interpret_reply =
243 (ptlrpc_interpterer_t)osc_setattr_interpret;
245 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
246 sa = ptlrpc_req_async_args(req);
248 sa->sa_upcall = upcall;
249 sa->sa_cookie = cookie;
251 if (rqset == PTLRPCD_SET)
252 ptlrpcd_add_req(req);
254 ptlrpc_set_add_req(rqset, req);
260 static int osc_ladvise_interpret(const struct lu_env *env,
261 struct ptlrpc_request *req,
264 struct osc_ladvise_args *la = arg;
265 struct ost_body *body;
271 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
273 GOTO(out, rc = -EPROTO);
275 *la->la_oa = body->oa;
277 rc = la->la_upcall(la->la_cookie, rc);
282 * If rqset is NULL, do not wait for response. Upcall and cookie could also
283 * be NULL in this case
285 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
286 struct ladvise_hdr *ladvise_hdr,
287 obd_enqueue_update_f upcall, void *cookie,
288 struct ptlrpc_request_set *rqset)
290 struct ptlrpc_request *req;
291 struct ost_body *body;
292 struct osc_ladvise_args *la;
294 struct lu_ladvise *req_ladvise;
295 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
296 int num_advise = ladvise_hdr->lah_count;
297 struct ladvise_hdr *req_ladvise_hdr;
300 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
304 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
305 num_advise * sizeof(*ladvise));
306 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
308 ptlrpc_request_free(req);
311 req->rq_request_portal = OST_IO_PORTAL;
312 ptlrpc_at_set_req_timeout(req);
314 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
316 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
319 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
320 &RMF_OST_LADVISE_HDR);
321 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
323 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
324 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
325 ptlrpc_request_set_replen(req);
328 /* Do not wait for response. */
329 ptlrpcd_add_req(req);
333 req->rq_interpret_reply = osc_ladvise_interpret;
334 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
335 la = ptlrpc_req_async_args(req);
337 la->la_upcall = upcall;
338 la->la_cookie = cookie;
340 if (rqset == PTLRPCD_SET)
341 ptlrpcd_add_req(req);
343 ptlrpc_set_add_req(rqset, req);
348 static int osc_create(const struct lu_env *env, struct obd_export *exp,
351 struct ptlrpc_request *req;
352 struct ost_body *body;
357 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
358 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
360 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
362 GOTO(out, rc = -ENOMEM);
364 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
366 ptlrpc_request_free(req);
370 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
373 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
375 ptlrpc_request_set_replen(req);
377 rc = ptlrpc_queue_wait(req);
381 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
383 GOTO(out_req, rc = -EPROTO);
385 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
386 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
388 oa->o_blksize = cli_brw_size(exp->exp_obd);
389 oa->o_valid |= OBD_MD_FLBLKSZ;
391 CDEBUG(D_HA, "transno: %lld\n",
392 lustre_msg_get_transno(req->rq_repmsg));
394 ptlrpc_req_finished(req);
399 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
400 obd_enqueue_update_f upcall, void *cookie,
401 struct ptlrpc_request_set *rqset)
403 struct ptlrpc_request *req;
404 struct osc_setattr_args *sa;
405 struct ost_body *body;
409 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
413 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
415 ptlrpc_request_free(req);
418 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
419 ptlrpc_at_set_req_timeout(req);
421 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
423 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
425 ptlrpc_request_set_replen(req);
427 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
428 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
429 sa = ptlrpc_req_async_args(req);
431 sa->sa_upcall = upcall;
432 sa->sa_cookie = cookie;
433 if (rqset == PTLRPCD_SET)
434 ptlrpcd_add_req(req);
436 ptlrpc_set_add_req(rqset, req);
441 static int osc_sync_interpret(const struct lu_env *env,
442 struct ptlrpc_request *req,
445 struct osc_fsync_args *fa = arg;
446 struct ost_body *body;
447 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
448 unsigned long valid = 0;
449 struct cl_object *obj;
455 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
457 CERROR("can't unpack ost_body\n");
458 GOTO(out, rc = -EPROTO);
461 *fa->fa_oa = body->oa;
462 obj = osc2cl(fa->fa_obj);
464 /* Update osc object's blocks attribute */
465 cl_object_attr_lock(obj);
466 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
467 attr->cat_blocks = body->oa.o_blocks;
472 cl_object_attr_update(env, obj, attr, valid);
473 cl_object_attr_unlock(obj);
476 rc = fa->fa_upcall(fa->fa_cookie, rc);
480 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
481 obd_enqueue_update_f upcall, void *cookie,
482 struct ptlrpc_request_set *rqset)
484 struct obd_export *exp = osc_export(obj);
485 struct ptlrpc_request *req;
486 struct ost_body *body;
487 struct osc_fsync_args *fa;
491 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
495 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
497 ptlrpc_request_free(req);
501 /* overload the size and blocks fields in the oa with start/end */
502 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
504 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
506 ptlrpc_request_set_replen(req);
507 req->rq_interpret_reply = osc_sync_interpret;
509 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
510 fa = ptlrpc_req_async_args(req);
513 fa->fa_upcall = upcall;
514 fa->fa_cookie = cookie;
516 if (rqset == PTLRPCD_SET)
517 ptlrpcd_add_req(req);
519 ptlrpc_set_add_req(rqset, req);
524 /* Find and cancel locally locks matched by @mode in the resource found by
525 * @objid. Found locks are added into @cancel list. Returns the amount of
526 * locks added to @cancels list. */
527 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
528 struct list_head *cancels,
529 enum ldlm_mode mode, __u64 lock_flags)
531 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
532 struct ldlm_res_id res_id;
533 struct ldlm_resource *res;
537 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
538 * export) but disabled through procfs (flag in NS).
540 * This distinguishes from a case when ELC is not supported originally,
541 * when we still want to cancel locks in advance and just cancel them
542 * locally, without sending any RPC. */
543 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
546 ostid_build_res_name(&oa->o_oi, &res_id);
547 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
551 LDLM_RESOURCE_ADDREF(res);
552 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
553 lock_flags, 0, NULL);
554 LDLM_RESOURCE_DELREF(res);
555 ldlm_resource_putref(res);
559 static int osc_destroy_interpret(const struct lu_env *env,
560 struct ptlrpc_request *req, void *data,
563 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
565 atomic_dec(&cli->cl_destroy_in_flight);
566 wake_up(&cli->cl_destroy_waitq);
570 static int osc_can_send_destroy(struct client_obd *cli)
572 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
573 cli->cl_max_rpcs_in_flight) {
574 /* The destroy request can be sent */
577 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
578 cli->cl_max_rpcs_in_flight) {
580 * The counter has been modified between the two atomic
583 wake_up(&cli->cl_destroy_waitq);
588 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
591 struct client_obd *cli = &exp->exp_obd->u.cli;
592 struct ptlrpc_request *req;
593 struct ost_body *body;
594 struct list_head cancels = LIST_HEAD_INIT(cancels);
599 CDEBUG(D_INFO, "oa NULL\n");
603 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
604 LDLM_FL_DISCARD_DATA);
606 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
608 ldlm_lock_list_put(&cancels, l_bl_ast, count);
612 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
615 ptlrpc_request_free(req);
619 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
620 ptlrpc_at_set_req_timeout(req);
622 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
624 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
626 ptlrpc_request_set_replen(req);
628 req->rq_interpret_reply = osc_destroy_interpret;
629 if (!osc_can_send_destroy(cli)) {
630 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
633 * Wait until the number of on-going destroy RPCs drops
634 * under max_rpc_in_flight
636 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
637 osc_can_send_destroy(cli), &lwi);
639 ptlrpc_req_finished(req);
644 /* Do not wait for response */
645 ptlrpcd_add_req(req);
649 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
652 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
654 LASSERT(!(oa->o_valid & bits));
657 spin_lock(&cli->cl_loi_list_lock);
658 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
659 oa->o_dirty = cli->cl_dirty_grant;
661 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
662 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
663 cli->cl_dirty_max_pages)) {
664 CERROR("dirty %lu - %lu > dirty_max %lu\n",
665 cli->cl_dirty_pages, cli->cl_dirty_transit,
666 cli->cl_dirty_max_pages);
668 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
669 atomic_long_read(&obd_dirty_transit_pages) >
670 (long)(obd_max_dirty_pages + 1))) {
671 /* The atomic_read() allowing the atomic_inc() are
672 * not covered by a lock thus they may safely race and trip
673 * this CERROR() unless we add in a small fudge factor (+1). */
674 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
675 cli_name(cli), atomic_long_read(&obd_dirty_pages),
676 atomic_long_read(&obd_dirty_transit_pages),
677 obd_max_dirty_pages);
679 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
681 CERROR("dirty %lu - dirty_max %lu too big???\n",
682 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
685 unsigned long nrpages;
687 nrpages = cli->cl_max_pages_per_rpc;
688 nrpages *= cli->cl_max_rpcs_in_flight + 1;
689 nrpages = max(nrpages, cli->cl_dirty_max_pages);
690 oa->o_undirty = nrpages << PAGE_SHIFT;
691 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
695 /* take extent tax into account when asking for more
697 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
698 cli->cl_max_extent_pages;
699 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
702 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
703 oa->o_dropped = cli->cl_lost_grant;
704 cli->cl_lost_grant = 0;
705 spin_unlock(&cli->cl_loi_list_lock);
706 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
707 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
710 void osc_update_next_shrink(struct client_obd *cli)
712 cli->cl_next_shrink_grant = ktime_get_seconds() +
713 cli->cl_grant_shrink_interval;
715 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
716 cli->cl_next_shrink_grant);
719 static void __osc_update_grant(struct client_obd *cli, u64 grant)
721 spin_lock(&cli->cl_loi_list_lock);
722 cli->cl_avail_grant += grant;
723 spin_unlock(&cli->cl_loi_list_lock);
726 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
728 if (body->oa.o_valid & OBD_MD_FLGRANT) {
729 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
730 __osc_update_grant(cli, body->oa.o_grant);
734 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
735 u32 keylen, void *key,
736 u32 vallen, void *val,
737 struct ptlrpc_request_set *set);
739 static int osc_shrink_grant_interpret(const struct lu_env *env,
740 struct ptlrpc_request *req,
743 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
744 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
745 struct ost_body *body;
748 __osc_update_grant(cli, oa->o_grant);
752 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
754 osc_update_grant(cli, body);
760 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
762 spin_lock(&cli->cl_loi_list_lock);
763 oa->o_grant = cli->cl_avail_grant / 4;
764 cli->cl_avail_grant -= oa->o_grant;
765 spin_unlock(&cli->cl_loi_list_lock);
766 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
767 oa->o_valid |= OBD_MD_FLFLAGS;
770 oa->o_flags |= OBD_FL_SHRINK_GRANT;
771 osc_update_next_shrink(cli);
774 /* Shrink the current grant, either from some large amount to enough for a
775 * full set of in-flight RPCs, or if we have already shrunk to that limit
776 * then to enough for a single RPC. This avoids keeping more grant than
777 * needed, and avoids shrinking the grant piecemeal. */
778 static int osc_shrink_grant(struct client_obd *cli)
780 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
781 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
783 spin_lock(&cli->cl_loi_list_lock);
784 if (cli->cl_avail_grant <= target_bytes)
785 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
786 spin_unlock(&cli->cl_loi_list_lock);
788 return osc_shrink_grant_to_target(cli, target_bytes);
791 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
794 struct ost_body *body;
797 spin_lock(&cli->cl_loi_list_lock);
798 /* Don't shrink if we are already above or below the desired limit
799 * We don't want to shrink below a single RPC, as that will negatively
800 * impact block allocation and long-term performance. */
801 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
802 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
804 if (target_bytes >= cli->cl_avail_grant) {
805 spin_unlock(&cli->cl_loi_list_lock);
808 spin_unlock(&cli->cl_loi_list_lock);
814 osc_announce_cached(cli, &body->oa, 0);
816 spin_lock(&cli->cl_loi_list_lock);
817 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
818 cli->cl_avail_grant = target_bytes;
819 spin_unlock(&cli->cl_loi_list_lock);
820 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
821 body->oa.o_valid |= OBD_MD_FLFLAGS;
822 body->oa.o_flags = 0;
824 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
825 osc_update_next_shrink(cli);
827 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
828 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
829 sizeof(*body), body, NULL);
831 __osc_update_grant(cli, body->oa.o_grant);
836 static int osc_should_shrink_grant(struct client_obd *client)
838 time64_t next_shrink = client->cl_next_shrink_grant;
840 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
841 OBD_CONNECT_GRANT_SHRINK) == 0)
844 if (ktime_get_seconds() >= next_shrink - 5) {
845 /* Get the current RPC size directly, instead of going via:
846 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
847 * Keep comment here so that it can be found by searching. */
848 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
850 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
851 client->cl_avail_grant > brw_size)
854 osc_update_next_shrink(client);
859 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
861 struct client_obd *client;
863 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
864 if (osc_should_shrink_grant(client))
865 osc_shrink_grant(client);
870 static int osc_add_shrink_grant(struct client_obd *client)
874 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
876 osc_grant_shrink_grant_cb, NULL,
877 &client->cl_grant_shrink_list);
879 CERROR("add grant client %s error %d\n", cli_name(client), rc);
882 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
883 osc_update_next_shrink(client);
887 static int osc_del_shrink_grant(struct client_obd *client)
889 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
893 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
896 * ocd_grant is the total grant amount we're expect to hold: if we've
897 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
898 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
901 * race is tolerable here: if we're evicted, but imp_state already
902 * left EVICTED state, then cl_dirty_pages must be 0 already.
904 spin_lock(&cli->cl_loi_list_lock);
905 cli->cl_avail_grant = ocd->ocd_grant;
906 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
907 cli->cl_avail_grant -= cli->cl_reserved_grant;
908 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
909 cli->cl_avail_grant -= cli->cl_dirty_grant;
911 cli->cl_avail_grant -=
912 cli->cl_dirty_pages << PAGE_SHIFT;
915 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
919 /* overhead for each extent insertion */
920 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
921 /* determine the appropriate chunk size used by osc_extent. */
922 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
923 ocd->ocd_grant_blkbits);
924 /* max_pages_per_rpc must be chunk aligned */
925 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
926 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
927 ~chunk_mask) & chunk_mask;
928 /* determine maximum extent size, in #pages */
929 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
930 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
931 if (cli->cl_max_extent_pages == 0)
932 cli->cl_max_extent_pages = 1;
934 cli->cl_grant_extent_tax = 0;
935 cli->cl_chunkbits = PAGE_SHIFT;
936 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
938 spin_unlock(&cli->cl_loi_list_lock);
940 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
941 "chunk bits: %d cl_max_extent_pages: %d\n",
943 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
944 cli->cl_max_extent_pages);
946 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
947 list_empty(&cli->cl_grant_shrink_list))
948 osc_add_shrink_grant(cli);
951 /* We assume that the reason this OSC got a short read is because it read
952 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
953 * via the LOV, and it _knows_ it's reading inside the file, it's just that
954 * this stripe never got written at or beyond this stripe offset yet. */
955 static void handle_short_read(int nob_read, size_t page_count,
956 struct brw_page **pga)
961 /* skip bytes read OK */
962 while (nob_read > 0) {
963 LASSERT (page_count > 0);
965 if (pga[i]->count > nob_read) {
966 /* EOF inside this page */
967 ptr = kmap(pga[i]->pg) +
968 (pga[i]->off & ~PAGE_MASK);
969 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
976 nob_read -= pga[i]->count;
981 /* zero remaining pages */
982 while (page_count-- > 0) {
983 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
984 memset(ptr, 0, pga[i]->count);
990 static int check_write_rcs(struct ptlrpc_request *req,
991 int requested_nob, int niocount,
992 size_t page_count, struct brw_page **pga)
997 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
998 sizeof(*remote_rcs) *
1000 if (remote_rcs == NULL) {
1001 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1005 /* return error if any niobuf was in error */
1006 for (i = 0; i < niocount; i++) {
1007 if ((int)remote_rcs[i] < 0)
1008 return(remote_rcs[i]);
1010 if (remote_rcs[i] != 0) {
1011 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1012 i, remote_rcs[i], req);
1016 if (req->rq_bulk != NULL &&
1017 req->rq_bulk->bd_nob_transferred != requested_nob) {
1018 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1019 req->rq_bulk->bd_nob_transferred, requested_nob);
1026 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1028 if (p1->flag != p2->flag) {
1029 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1030 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1031 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1033 /* warn if we try to combine flags that we don't know to be
1034 * safe to combine */
1035 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1036 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1037 "report this at https://jira.hpdd.intel.com/\n",
1038 p1->flag, p2->flag);
1043 return (p1->off + p1->count == p2->off);
1046 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1047 struct brw_page **pga, int opc,
1048 enum cksum_types cksum_type)
1052 struct cfs_crypto_hash_desc *hdesc;
1053 unsigned int bufsize;
1054 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1056 LASSERT(pg_count > 0);
1058 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1059 if (IS_ERR(hdesc)) {
1060 CERROR("Unable to initialize checksum hash %s\n",
1061 cfs_crypto_hash_name(cfs_alg));
1062 return PTR_ERR(hdesc);
1065 while (nob > 0 && pg_count > 0) {
1066 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1068 /* corrupt the data before we compute the checksum, to
1069 * simulate an OST->client data error */
1070 if (i == 0 && opc == OST_READ &&
1071 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1072 unsigned char *ptr = kmap(pga[i]->pg);
1073 int off = pga[i]->off & ~PAGE_MASK;
1075 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1078 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1079 pga[i]->off & ~PAGE_MASK,
1081 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1082 (int)(pga[i]->off & ~PAGE_MASK));
1084 nob -= pga[i]->count;
1089 bufsize = sizeof(cksum);
1090 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1092 /* For sending we only compute the wrong checksum instead
1093 * of corrupting the data so it is still correct on a redo */
1094 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1101 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1102 u32 page_count, struct brw_page **pga,
1103 struct ptlrpc_request **reqp, int resend)
1105 struct ptlrpc_request *req;
1106 struct ptlrpc_bulk_desc *desc;
1107 struct ost_body *body;
1108 struct obd_ioobj *ioobj;
1109 struct niobuf_remote *niobuf;
1110 int niocount, i, requested_nob, opc, rc, short_io_size;
1111 struct osc_brw_async_args *aa;
1112 struct req_capsule *pill;
1113 struct brw_page *pg_prev;
1117 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1118 RETURN(-ENOMEM); /* Recoverable */
1119 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1120 RETURN(-EINVAL); /* Fatal */
1122 if ((cmd & OBD_BRW_WRITE) != 0) {
1124 req = ptlrpc_request_alloc_pool(cli->cl_import,
1126 &RQF_OST_BRW_WRITE);
1129 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1134 for (niocount = i = 1; i < page_count; i++) {
1135 if (!can_merge_pages(pga[i - 1], pga[i]))
1139 pill = &req->rq_pill;
1140 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1142 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1143 niocount * sizeof(*niobuf));
1145 for (i = 0; i < page_count; i++)
1146 short_io_size += pga[i]->count;
1148 /* Check if we can do a short io. */
1149 if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1150 imp_connect_shortio(cli->cl_import)))
1153 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1154 opc == OST_READ ? 0 : short_io_size);
1155 if (opc == OST_READ)
1156 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1159 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1161 ptlrpc_request_free(req);
1164 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1165 ptlrpc_at_set_req_timeout(req);
1167 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1169 req->rq_no_retry_einprogress = 1;
1171 if (short_io_size != 0) {
1173 short_io_buf = NULL;
1177 desc = ptlrpc_prep_bulk_imp(req, page_count,
1178 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1179 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1180 PTLRPC_BULK_PUT_SINK) |
1181 PTLRPC_BULK_BUF_KIOV,
1183 &ptlrpc_bulk_kiov_pin_ops);
1186 GOTO(out, rc = -ENOMEM);
1187 /* NB request now owns desc and will free it when it gets freed */
1189 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1190 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1191 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1192 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1194 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1196 obdo_to_ioobj(oa, ioobj);
1197 ioobj->ioo_bufcnt = niocount;
1198 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1199 * that might be send for this request. The actual number is decided
1200 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1201 * "max - 1" for old client compatibility sending "0", and also so the
1202 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1204 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1206 ioobj_max_brw_set(ioobj, 0);
1208 if (short_io_size != 0) {
1209 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1210 body->oa.o_valid |= OBD_MD_FLFLAGS;
1211 body->oa.o_flags = 0;
1213 body->oa.o_flags |= OBD_FL_SHORT_IO;
1214 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1216 if (opc == OST_WRITE) {
1217 short_io_buf = req_capsule_client_get(pill,
1219 LASSERT(short_io_buf != NULL);
1223 LASSERT(page_count > 0);
1225 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1226 struct brw_page *pg = pga[i];
1227 int poff = pg->off & ~PAGE_MASK;
1229 LASSERT(pg->count > 0);
1230 /* make sure there is no gap in the middle of page array */
1231 LASSERTF(page_count == 1 ||
1232 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1233 ergo(i > 0 && i < page_count - 1,
1234 poff == 0 && pg->count == PAGE_SIZE) &&
1235 ergo(i == page_count - 1, poff == 0)),
1236 "i: %d/%d pg: %p off: %llu, count: %u\n",
1237 i, page_count, pg, pg->off, pg->count);
1238 LASSERTF(i == 0 || pg->off > pg_prev->off,
1239 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1240 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1242 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1243 pg_prev->pg, page_private(pg_prev->pg),
1244 pg_prev->pg->index, pg_prev->off);
1245 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1246 (pg->flag & OBD_BRW_SRVLOCK));
1247 if (short_io_size != 0 && opc == OST_WRITE) {
1248 unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1250 LASSERT(short_io_size >= requested_nob + pg->count);
1251 memcpy(short_io_buf + requested_nob,
1254 ll_kunmap_atomic(ptr, KM_USER0);
1255 } else if (short_io_size == 0) {
1256 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1259 requested_nob += pg->count;
1261 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1263 niobuf->rnb_len += pg->count;
1265 niobuf->rnb_offset = pg->off;
1266 niobuf->rnb_len = pg->count;
1267 niobuf->rnb_flags = pg->flag;
1272 LASSERTF((void *)(niobuf - niocount) ==
1273 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1274 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1275 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1277 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1279 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1280 body->oa.o_valid |= OBD_MD_FLFLAGS;
1281 body->oa.o_flags = 0;
1283 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1286 if (osc_should_shrink_grant(cli))
1287 osc_shrink_grant_local(cli, &body->oa);
1289 /* size[REQ_REC_OFF] still sizeof (*body) */
1290 if (opc == OST_WRITE) {
1291 if (cli->cl_checksum &&
1292 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1293 /* store cl_cksum_type in a local variable since
1294 * it can be changed via lprocfs */
1295 enum cksum_types cksum_type = cli->cl_cksum_type;
1297 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1298 body->oa.o_flags = 0;
1300 body->oa.o_flags |= cksum_type_pack(cksum_type);
1301 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1302 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1306 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1308 /* save this in 'oa', too, for later checking */
1309 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1310 oa->o_flags |= cksum_type_pack(cksum_type);
1312 /* clear out the checksum flag, in case this is a
1313 * resend but cl_checksum is no longer set. b=11238 */
1314 oa->o_valid &= ~OBD_MD_FLCKSUM;
1316 oa->o_cksum = body->oa.o_cksum;
1317 /* 1 RC per niobuf */
1318 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1319 sizeof(__u32) * niocount);
1321 if (cli->cl_checksum &&
1322 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1323 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1324 body->oa.o_flags = 0;
1325 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1326 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1329 /* Client cksum has been already copied to wire obdo in previous
1330 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1331 * resent due to cksum error, this will allow Server to
1332 * check+dump pages on its side */
1334 ptlrpc_request_set_replen(req);
1336 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1337 aa = ptlrpc_req_async_args(req);
1339 aa->aa_requested_nob = requested_nob;
1340 aa->aa_nio_count = niocount;
1341 aa->aa_page_count = page_count;
1345 INIT_LIST_HEAD(&aa->aa_oaps);
1348 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1349 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1350 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1351 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1355 ptlrpc_req_finished(req);
1359 char dbgcksum_file_name[PATH_MAX];
1361 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1362 struct brw_page **pga, __u32 server_cksum,
1371 /* will only keep dump of pages on first error for the same range in
1372 * file/fid, not during the resends/retries. */
1373 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1374 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1375 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1376 libcfs_debug_file_path_arr :
1377 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1378 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1379 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1380 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1382 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1383 client_cksum, server_cksum);
1384 filp = filp_open(dbgcksum_file_name,
1385 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1389 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1390 "checksum error: rc = %d\n", dbgcksum_file_name,
1393 CERROR("%s: can't open to dump pages with checksum "
1394 "error: rc = %d\n", dbgcksum_file_name, rc);
1400 for (i = 0; i < page_count; i++) {
1401 len = pga[i]->count;
1402 buf = kmap(pga[i]->pg);
1404 rc = vfs_write(filp, (__force const char __user *)buf,
1407 CERROR("%s: wanted to write %u but got %d "
1408 "error\n", dbgcksum_file_name, len, rc);
1413 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1414 dbgcksum_file_name, rc);
1420 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1422 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1423 filp_close(filp, NULL);
1428 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1429 __u32 client_cksum, __u32 server_cksum,
1430 struct osc_brw_async_args *aa)
1434 enum cksum_types cksum_type;
1436 if (server_cksum == client_cksum) {
1437 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1441 if (aa->aa_cli->cl_checksum_dump)
1442 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1443 server_cksum, client_cksum);
1445 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1447 new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1448 aa->aa_ppga, OST_WRITE, cksum_type);
1450 if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1451 msg = "the server did not use the checksum type specified in "
1452 "the original request - likely a protocol problem";
1453 else if (new_cksum == server_cksum)
1454 msg = "changed on the client after we checksummed it - "
1455 "likely false positive due to mmap IO (bug 11742)";
1456 else if (new_cksum == client_cksum)
1457 msg = "changed in transit before arrival at OST";
1459 msg = "changed in transit AND doesn't match the original - "
1460 "likely false positive due to mmap IO (bug 11742)";
1462 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1463 DFID " object "DOSTID" extent [%llu-%llu], original "
1464 "client csum %x (type %x), server csum %x (type %x),"
1465 " client csum now %x\n",
1466 aa->aa_cli->cl_import->imp_obd->obd_name,
1467 msg, libcfs_nid2str(peer->nid),
1468 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1469 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1470 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1471 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1472 aa->aa_ppga[aa->aa_page_count - 1]->off +
1473 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1474 client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1475 server_cksum, cksum_type, new_cksum);
1479 /* Note rc enters this function as number of bytes transferred */
1480 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1482 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1483 const struct lnet_process_id *peer =
1484 &req->rq_import->imp_connection->c_peer;
1485 struct client_obd *cli = aa->aa_cli;
1486 struct ost_body *body;
1487 u32 client_cksum = 0;
1490 if (rc < 0 && rc != -EDQUOT) {
1491 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1495 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1496 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1498 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1502 /* set/clear over quota flag for a uid/gid/projid */
1503 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1504 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1505 unsigned qid[LL_MAXQUOTAS] = {
1506 body->oa.o_uid, body->oa.o_gid,
1507 body->oa.o_projid };
1508 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1509 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1510 body->oa.o_valid, body->oa.o_flags);
1511 osc_quota_setdq(cli, qid, body->oa.o_valid,
1515 osc_update_grant(cli, body);
1520 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1521 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1523 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1525 CERROR("Unexpected +ve rc %d\n", rc);
1529 if (req->rq_bulk != NULL &&
1530 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1533 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1534 check_write_checksum(&body->oa, peer, client_cksum,
1535 body->oa.o_cksum, aa))
1538 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1539 aa->aa_page_count, aa->aa_ppga);
1543 /* The rest of this function executes only for OST_READs */
1545 if (req->rq_bulk == NULL) {
1546 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1548 LASSERT(rc == req->rq_status);
1550 /* if unwrap_bulk failed, return -EAGAIN to retry */
1551 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1554 GOTO(out, rc = -EAGAIN);
1556 if (rc > aa->aa_requested_nob) {
1557 CERROR("Unexpected rc %d (%d requested)\n", rc,
1558 aa->aa_requested_nob);
1562 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1563 CERROR ("Unexpected rc %d (%d transferred)\n",
1564 rc, req->rq_bulk->bd_nob_transferred);
1568 if (req->rq_bulk == NULL) {
1570 int nob, pg_count, i = 0;
1573 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1574 pg_count = aa->aa_page_count;
1575 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1578 while (nob > 0 && pg_count > 0) {
1580 int count = aa->aa_ppga[i]->count > nob ?
1581 nob : aa->aa_ppga[i]->count;
1583 CDEBUG(D_CACHE, "page %p count %d\n",
1584 aa->aa_ppga[i]->pg, count);
1585 ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1586 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1588 ll_kunmap_atomic((void *) ptr, KM_USER0);
1597 if (rc < aa->aa_requested_nob)
1598 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1600 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1601 static int cksum_counter;
1602 u32 server_cksum = body->oa.o_cksum;
1605 enum cksum_types cksum_type;
1607 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1608 body->oa.o_flags : 0);
1609 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1610 aa->aa_ppga, OST_READ,
1613 if (req->rq_bulk != NULL &&
1614 peer->nid != req->rq_bulk->bd_sender) {
1616 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1619 if (server_cksum != client_cksum) {
1620 struct ost_body *clbody;
1621 u32 page_count = aa->aa_page_count;
1623 clbody = req_capsule_client_get(&req->rq_pill,
1625 if (cli->cl_checksum_dump)
1626 dump_all_bulk_pages(&clbody->oa, page_count,
1627 aa->aa_ppga, server_cksum,
1630 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1631 "%s%s%s inode "DFID" object "DOSTID
1632 " extent [%llu-%llu], client %x, "
1633 "server %x, cksum_type %x\n",
1634 req->rq_import->imp_obd->obd_name,
1635 libcfs_nid2str(peer->nid),
1637 clbody->oa.o_valid & OBD_MD_FLFID ?
1638 clbody->oa.o_parent_seq : 0ULL,
1639 clbody->oa.o_valid & OBD_MD_FLFID ?
1640 clbody->oa.o_parent_oid : 0,
1641 clbody->oa.o_valid & OBD_MD_FLFID ?
1642 clbody->oa.o_parent_ver : 0,
1643 POSTID(&body->oa.o_oi),
1644 aa->aa_ppga[0]->off,
1645 aa->aa_ppga[page_count-1]->off +
1646 aa->aa_ppga[page_count-1]->count - 1,
1647 client_cksum, server_cksum,
1650 aa->aa_oa->o_cksum = client_cksum;
1654 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1657 } else if (unlikely(client_cksum)) {
1658 static int cksum_missed;
1661 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1662 CERROR("Checksum %u requested from %s but not sent\n",
1663 cksum_missed, libcfs_nid2str(peer->nid));
1669 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1670 aa->aa_oa, &body->oa);
1675 static int osc_brw_redo_request(struct ptlrpc_request *request,
1676 struct osc_brw_async_args *aa, int rc)
1678 struct ptlrpc_request *new_req;
1679 struct osc_brw_async_args *new_aa;
1680 struct osc_async_page *oap;
1683 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1684 "redo for recoverable error %d", rc);
1686 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1687 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1688 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1689 aa->aa_ppga, &new_req, 1);
1693 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1694 if (oap->oap_request != NULL) {
1695 LASSERTF(request == oap->oap_request,
1696 "request %p != oap_request %p\n",
1697 request, oap->oap_request);
1698 if (oap->oap_interrupted) {
1699 ptlrpc_req_finished(new_req);
1704 /* New request takes over pga and oaps from old request.
1705 * Note that copying a list_head doesn't work, need to move it... */
1707 new_req->rq_interpret_reply = request->rq_interpret_reply;
1708 new_req->rq_async_args = request->rq_async_args;
1709 new_req->rq_commit_cb = request->rq_commit_cb;
1710 /* cap resend delay to the current request timeout, this is similar to
1711 * what ptlrpc does (see after_reply()) */
1712 if (aa->aa_resends > new_req->rq_timeout)
1713 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1715 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1716 new_req->rq_generation_set = 1;
1717 new_req->rq_import_generation = request->rq_import_generation;
1719 new_aa = ptlrpc_req_async_args(new_req);
1721 INIT_LIST_HEAD(&new_aa->aa_oaps);
1722 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1723 INIT_LIST_HEAD(&new_aa->aa_exts);
1724 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1725 new_aa->aa_resends = aa->aa_resends;
1727 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1728 if (oap->oap_request) {
1729 ptlrpc_req_finished(oap->oap_request);
1730 oap->oap_request = ptlrpc_request_addref(new_req);
1734 /* XXX: This code will run into problem if we're going to support
1735 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1736 * and wait for all of them to be finished. We should inherit request
1737 * set from old request. */
1738 ptlrpcd_add_req(new_req);
1740 DEBUG_REQ(D_INFO, new_req, "new request");
1745 * ugh, we want disk allocation on the target to happen in offset order. we'll
1746 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1747 * fine for our small page arrays and doesn't require allocation. its an
1748 * insertion sort that swaps elements that are strides apart, shrinking the
1749 * stride down until its '1' and the array is sorted.
1751 static void sort_brw_pages(struct brw_page **array, int num)
1754 struct brw_page *tmp;
1758 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1763 for (i = stride ; i < num ; i++) {
1766 while (j >= stride && array[j - stride]->off > tmp->off) {
1767 array[j] = array[j - stride];
1772 } while (stride > 1);
1775 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1777 LASSERT(ppga != NULL);
1778 OBD_FREE(ppga, sizeof(*ppga) * count);
1781 static int brw_interpret(const struct lu_env *env,
1782 struct ptlrpc_request *req, void *data, int rc)
1784 struct osc_brw_async_args *aa = data;
1785 struct osc_extent *ext;
1786 struct osc_extent *tmp;
1787 struct client_obd *cli = aa->aa_cli;
1788 unsigned long transferred = 0;
1791 rc = osc_brw_fini_request(req, rc);
1792 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1793 /* When server return -EINPROGRESS, client should always retry
1794 * regardless of the number of times the bulk was resent already. */
1795 if (osc_recoverable_error(rc)) {
1796 if (req->rq_import_generation !=
1797 req->rq_import->imp_generation) {
1798 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1799 ""DOSTID", rc = %d.\n",
1800 req->rq_import->imp_obd->obd_name,
1801 POSTID(&aa->aa_oa->o_oi), rc);
1802 } else if (rc == -EINPROGRESS ||
1803 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1804 rc = osc_brw_redo_request(req, aa, rc);
1806 CERROR("%s: too many resent retries for object: "
1807 "%llu:%llu, rc = %d.\n",
1808 req->rq_import->imp_obd->obd_name,
1809 POSTID(&aa->aa_oa->o_oi), rc);
1814 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1819 struct obdo *oa = aa->aa_oa;
1820 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1821 unsigned long valid = 0;
1822 struct cl_object *obj;
1823 struct osc_async_page *last;
1825 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1826 obj = osc2cl(last->oap_obj);
1828 cl_object_attr_lock(obj);
1829 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1830 attr->cat_blocks = oa->o_blocks;
1831 valid |= CAT_BLOCKS;
1833 if (oa->o_valid & OBD_MD_FLMTIME) {
1834 attr->cat_mtime = oa->o_mtime;
1837 if (oa->o_valid & OBD_MD_FLATIME) {
1838 attr->cat_atime = oa->o_atime;
1841 if (oa->o_valid & OBD_MD_FLCTIME) {
1842 attr->cat_ctime = oa->o_ctime;
1846 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1847 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1848 loff_t last_off = last->oap_count + last->oap_obj_off +
1851 /* Change file size if this is an out of quota or
1852 * direct IO write and it extends the file size */
1853 if (loi->loi_lvb.lvb_size < last_off) {
1854 attr->cat_size = last_off;
1857 /* Extend KMS if it's not a lockless write */
1858 if (loi->loi_kms < last_off &&
1859 oap2osc_page(last)->ops_srvlock == 0) {
1860 attr->cat_kms = last_off;
1866 cl_object_attr_update(env, obj, attr, valid);
1867 cl_object_attr_unlock(obj);
1869 OBDO_FREE(aa->aa_oa);
1871 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1872 osc_inc_unstable_pages(req);
1874 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1875 list_del_init(&ext->oe_link);
1876 osc_extent_finish(env, ext, 1, rc);
1878 LASSERT(list_empty(&aa->aa_exts));
1879 LASSERT(list_empty(&aa->aa_oaps));
1881 transferred = (req->rq_bulk == NULL ? /* short io */
1882 aa->aa_requested_nob :
1883 req->rq_bulk->bd_nob_transferred);
1885 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1886 ptlrpc_lprocfs_brw(req, transferred);
1888 spin_lock(&cli->cl_loi_list_lock);
1889 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1890 * is called so we know whether to go to sync BRWs or wait for more
1891 * RPCs to complete */
1892 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1893 cli->cl_w_in_flight--;
1895 cli->cl_r_in_flight--;
1896 osc_wake_cache_waiters(cli);
1897 spin_unlock(&cli->cl_loi_list_lock);
1899 osc_io_unplug(env, cli, NULL);
1903 static void brw_commit(struct ptlrpc_request *req)
1905 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1906 * this called via the rq_commit_cb, I need to ensure
1907 * osc_dec_unstable_pages is still called. Otherwise unstable
1908 * pages may be leaked. */
1909 spin_lock(&req->rq_lock);
1910 if (likely(req->rq_unstable)) {
1911 req->rq_unstable = 0;
1912 spin_unlock(&req->rq_lock);
1914 osc_dec_unstable_pages(req);
1916 req->rq_committed = 1;
1917 spin_unlock(&req->rq_lock);
1922 * Build an RPC by the list of extent @ext_list. The caller must ensure
1923 * that the total pages in this list are NOT over max pages per RPC.
1924 * Extents in the list must be in OES_RPC state.
1926 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1927 struct list_head *ext_list, int cmd)
1929 struct ptlrpc_request *req = NULL;
1930 struct osc_extent *ext;
1931 struct brw_page **pga = NULL;
1932 struct osc_brw_async_args *aa = NULL;
1933 struct obdo *oa = NULL;
1934 struct osc_async_page *oap;
1935 struct osc_object *obj = NULL;
1936 struct cl_req_attr *crattr = NULL;
1937 loff_t starting_offset = OBD_OBJECT_EOF;
1938 loff_t ending_offset = 0;
1942 bool soft_sync = false;
1943 bool interrupted = false;
1947 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1948 struct ost_body *body;
1950 LASSERT(!list_empty(ext_list));
1952 /* add pages into rpc_list to build BRW rpc */
1953 list_for_each_entry(ext, ext_list, oe_link) {
1954 LASSERT(ext->oe_state == OES_RPC);
1955 mem_tight |= ext->oe_memalloc;
1956 grant += ext->oe_grants;
1957 page_count += ext->oe_nr_pages;
1962 soft_sync = osc_over_unstable_soft_limit(cli);
1964 mpflag = cfs_memory_pressure_get_and_set();
1966 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1968 GOTO(out, rc = -ENOMEM);
1972 GOTO(out, rc = -ENOMEM);
1975 list_for_each_entry(ext, ext_list, oe_link) {
1976 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1978 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1980 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1981 pga[i] = &oap->oap_brw_page;
1982 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1985 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1986 if (starting_offset == OBD_OBJECT_EOF ||
1987 starting_offset > oap->oap_obj_off)
1988 starting_offset = oap->oap_obj_off;
1990 LASSERT(oap->oap_page_off == 0);
1991 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1992 ending_offset = oap->oap_obj_off +
1995 LASSERT(oap->oap_page_off + oap->oap_count ==
1997 if (oap->oap_interrupted)
2002 /* first page in the list */
2003 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2005 crattr = &osc_env_info(env)->oti_req_attr;
2006 memset(crattr, 0, sizeof(*crattr));
2007 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2008 crattr->cra_flags = ~0ULL;
2009 crattr->cra_page = oap2cl_page(oap);
2010 crattr->cra_oa = oa;
2011 cl_req_attr_set(env, osc2cl(obj), crattr);
2013 if (cmd == OBD_BRW_WRITE)
2014 oa->o_grant_used = grant;
2016 sort_brw_pages(pga, page_count);
2017 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2019 CERROR("prep_req failed: %d\n", rc);
2023 req->rq_commit_cb = brw_commit;
2024 req->rq_interpret_reply = brw_interpret;
2025 req->rq_memalloc = mem_tight != 0;
2026 oap->oap_request = ptlrpc_request_addref(req);
2027 if (interrupted && !req->rq_intr)
2028 ptlrpc_mark_interrupted(req);
2030 /* Need to update the timestamps after the request is built in case
2031 * we race with setattr (locally or in queue at OST). If OST gets
2032 * later setattr before earlier BRW (as determined by the request xid),
2033 * the OST will not use BRW timestamps. Sadly, there is no obvious
2034 * way to do this in a single call. bug 10150 */
2035 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2036 crattr->cra_oa = &body->oa;
2037 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
2038 cl_req_attr_set(env, osc2cl(obj), crattr);
2039 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2041 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2042 aa = ptlrpc_req_async_args(req);
2043 INIT_LIST_HEAD(&aa->aa_oaps);
2044 list_splice_init(&rpc_list, &aa->aa_oaps);
2045 INIT_LIST_HEAD(&aa->aa_exts);
2046 list_splice_init(ext_list, &aa->aa_exts);
2048 spin_lock(&cli->cl_loi_list_lock);
2049 starting_offset >>= PAGE_SHIFT;
2050 if (cmd == OBD_BRW_READ) {
2051 cli->cl_r_in_flight++;
2052 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2053 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2054 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2055 starting_offset + 1);
2057 cli->cl_w_in_flight++;
2058 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2059 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2060 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2061 starting_offset + 1);
2063 spin_unlock(&cli->cl_loi_list_lock);
2065 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2066 page_count, aa, cli->cl_r_in_flight,
2067 cli->cl_w_in_flight);
2068 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2070 ptlrpcd_add_req(req);
2076 cfs_memory_pressure_restore(mpflag);
2079 LASSERT(req == NULL);
2084 OBD_FREE(pga, sizeof(*pga) * page_count);
2085 /* this should happen rarely and is pretty bad, it makes the
2086 * pending list not follow the dirty order */
2087 while (!list_empty(ext_list)) {
2088 ext = list_entry(ext_list->next, struct osc_extent,
2090 list_del_init(&ext->oe_link);
2091 osc_extent_finish(env, ext, 0, rc);
2097 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2101 LASSERT(lock != NULL);
2103 lock_res_and_lock(lock);
2105 if (lock->l_ast_data == NULL)
2106 lock->l_ast_data = data;
2107 if (lock->l_ast_data == data)
2110 unlock_res_and_lock(lock);
2115 static int osc_enqueue_fini(struct ptlrpc_request *req,
2116 osc_enqueue_upcall_f upcall, void *cookie,
2117 struct lustre_handle *lockh, enum ldlm_mode mode,
2118 __u64 *flags, bool speculative, int errcode)
2120 bool intent = *flags & LDLM_FL_HAS_INTENT;
2124 /* The request was created before ldlm_cli_enqueue call. */
2125 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2126 struct ldlm_reply *rep;
2128 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2129 LASSERT(rep != NULL);
2131 rep->lock_policy_res1 =
2132 ptlrpc_status_ntoh(rep->lock_policy_res1);
2133 if (rep->lock_policy_res1)
2134 errcode = rep->lock_policy_res1;
2136 *flags |= LDLM_FL_LVB_READY;
2137 } else if (errcode == ELDLM_OK) {
2138 *flags |= LDLM_FL_LVB_READY;
2141 /* Call the update callback. */
2142 rc = (*upcall)(cookie, lockh, errcode);
2144 /* release the reference taken in ldlm_cli_enqueue() */
2145 if (errcode == ELDLM_LOCK_MATCHED)
2147 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2148 ldlm_lock_decref(lockh, mode);
2153 static int osc_enqueue_interpret(const struct lu_env *env,
2154 struct ptlrpc_request *req,
2155 struct osc_enqueue_args *aa, int rc)
2157 struct ldlm_lock *lock;
2158 struct lustre_handle *lockh = &aa->oa_lockh;
2159 enum ldlm_mode mode = aa->oa_mode;
2160 struct ost_lvb *lvb = aa->oa_lvb;
2161 __u32 lvb_len = sizeof(*lvb);
2166 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2168 lock = ldlm_handle2lock(lockh);
2169 LASSERTF(lock != NULL,
2170 "lockh %#llx, req %p, aa %p - client evicted?\n",
2171 lockh->cookie, req, aa);
2173 /* Take an additional reference so that a blocking AST that
2174 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2175 * to arrive after an upcall has been executed by
2176 * osc_enqueue_fini(). */
2177 ldlm_lock_addref(lockh, mode);
2179 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2180 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2182 /* Let CP AST to grant the lock first. */
2183 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2185 if (aa->oa_speculative) {
2186 LASSERT(aa->oa_lvb == NULL);
2187 LASSERT(aa->oa_flags == NULL);
2188 aa->oa_flags = &flags;
2191 /* Complete obtaining the lock procedure. */
2192 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2193 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2195 /* Complete osc stuff. */
2196 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2197 aa->oa_flags, aa->oa_speculative, rc);
2199 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2201 ldlm_lock_decref(lockh, mode);
2202 LDLM_LOCK_PUT(lock);
2206 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2208 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2209 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2210 * other synchronous requests, however keeping some locks and trying to obtain
2211 * others may take a considerable amount of time in a case of ost failure; and
2212 * when other sync requests do not get released lock from a client, the client
2213 * is evicted from the cluster -- such scenarious make the life difficult, so
2214 * release locks just after they are obtained. */
2215 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2216 __u64 *flags, union ldlm_policy_data *policy,
2217 struct ost_lvb *lvb, int kms_valid,
2218 osc_enqueue_upcall_f upcall, void *cookie,
2219 struct ldlm_enqueue_info *einfo,
2220 struct ptlrpc_request_set *rqset, int async,
2223 struct obd_device *obd = exp->exp_obd;
2224 struct lustre_handle lockh = { 0 };
2225 struct ptlrpc_request *req = NULL;
2226 int intent = *flags & LDLM_FL_HAS_INTENT;
2227 __u64 match_flags = *flags;
2228 enum ldlm_mode mode;
2232 /* Filesystem lock extents are extended to page boundaries so that
2233 * dealing with the page cache is a little smoother. */
2234 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2235 policy->l_extent.end |= ~PAGE_MASK;
2238 * kms is not valid when either object is completely fresh (so that no
2239 * locks are cached), or object was evicted. In the latter case cached
2240 * lock cannot be used, because it would prime inode state with
2241 * potentially stale LVB.
2246 /* Next, search for already existing extent locks that will cover us */
2247 /* If we're trying to read, we also search for an existing PW lock. The
2248 * VFS and page cache already protect us locally, so lots of readers/
2249 * writers can share a single PW lock.
2251 * There are problems with conversion deadlocks, so instead of
2252 * converting a read lock to a write lock, we'll just enqueue a new
2255 * At some point we should cancel the read lock instead of making them
2256 * send us a blocking callback, but there are problems with canceling
2257 * locks out from other users right now, too. */
2258 mode = einfo->ei_mode;
2259 if (einfo->ei_mode == LCK_PR)
2261 /* Normal lock requests must wait for the LVB to be ready before
2262 * matching a lock; speculative lock requests do not need to,
2263 * because they will not actually use the lock. */
2265 match_flags |= LDLM_FL_LVB_READY;
2267 match_flags |= LDLM_FL_BLOCK_GRANTED;
2268 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2269 einfo->ei_type, policy, mode, &lockh, 0);
2271 struct ldlm_lock *matched;
2273 if (*flags & LDLM_FL_TEST_LOCK)
2276 matched = ldlm_handle2lock(&lockh);
2278 /* This DLM lock request is speculative, and does not
2279 * have an associated IO request. Therefore if there
2280 * is already a DLM lock, it wll just inform the
2281 * caller to cancel the request for this stripe.*/
2282 lock_res_and_lock(matched);
2283 if (ldlm_extent_equal(&policy->l_extent,
2284 &matched->l_policy_data.l_extent))
2288 unlock_res_and_lock(matched);
2290 ldlm_lock_decref(&lockh, mode);
2291 LDLM_LOCK_PUT(matched);
2293 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2294 *flags |= LDLM_FL_LVB_READY;
2296 /* We already have a lock, and it's referenced. */
2297 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2299 ldlm_lock_decref(&lockh, mode);
2300 LDLM_LOCK_PUT(matched);
2303 ldlm_lock_decref(&lockh, mode);
2304 LDLM_LOCK_PUT(matched);
2309 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2313 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2314 &RQF_LDLM_ENQUEUE_LVB);
2318 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2320 ptlrpc_request_free(req);
2324 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2326 ptlrpc_request_set_replen(req);
2329 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2330 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2332 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2333 sizeof(*lvb), LVB_T_OST, &lockh, async);
2336 struct osc_enqueue_args *aa;
2337 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2338 aa = ptlrpc_req_async_args(req);
2340 aa->oa_mode = einfo->ei_mode;
2341 aa->oa_type = einfo->ei_type;
2342 lustre_handle_copy(&aa->oa_lockh, &lockh);
2343 aa->oa_upcall = upcall;
2344 aa->oa_cookie = cookie;
2345 aa->oa_speculative = speculative;
2347 aa->oa_flags = flags;
2350 /* speculative locks are essentially to enqueue
2351 * a DLM lock in advance, so we don't care
2352 * about the result of the enqueue. */
2354 aa->oa_flags = NULL;
2357 req->rq_interpret_reply =
2358 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2359 if (rqset == PTLRPCD_SET)
2360 ptlrpcd_add_req(req);
2362 ptlrpc_set_add_req(rqset, req);
2363 } else if (intent) {
2364 ptlrpc_req_finished(req);
2369 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2370 flags, speculative, rc);
2372 ptlrpc_req_finished(req);
2377 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2378 enum ldlm_type type, union ldlm_policy_data *policy,
2379 enum ldlm_mode mode, __u64 *flags, void *data,
2380 struct lustre_handle *lockh, int unref)
2382 struct obd_device *obd = exp->exp_obd;
2383 __u64 lflags = *flags;
2387 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2390 /* Filesystem lock extents are extended to page boundaries so that
2391 * dealing with the page cache is a little smoother */
2392 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2393 policy->l_extent.end |= ~PAGE_MASK;
2395 /* Next, search for already existing extent locks that will cover us */
2396 /* If we're trying to read, we also search for an existing PW lock. The
2397 * VFS and page cache already protect us locally, so lots of readers/
2398 * writers can share a single PW lock. */
2402 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2403 res_id, type, policy, rc, lockh, unref);
2404 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2408 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2410 LASSERT(lock != NULL);
2411 if (!osc_set_lock_data(lock, data)) {
2412 ldlm_lock_decref(lockh, rc);
2415 LDLM_LOCK_PUT(lock);
2420 static int osc_statfs_interpret(const struct lu_env *env,
2421 struct ptlrpc_request *req,
2422 struct osc_async_args *aa, int rc)
2424 struct obd_statfs *msfs;
2428 /* The request has in fact never been sent
2429 * due to issues at a higher level (LOV).
2430 * Exit immediately since the caller is
2431 * aware of the problem and takes care
2432 * of the clean up */
2435 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2436 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2442 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2444 GOTO(out, rc = -EPROTO);
2447 *aa->aa_oi->oi_osfs = *msfs;
2449 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2453 static int osc_statfs_async(struct obd_export *exp,
2454 struct obd_info *oinfo, __u64 max_age,
2455 struct ptlrpc_request_set *rqset)
2457 struct obd_device *obd = class_exp2obd(exp);
2458 struct ptlrpc_request *req;
2459 struct osc_async_args *aa;
2463 /* We could possibly pass max_age in the request (as an absolute
2464 * timestamp or a "seconds.usec ago") so the target can avoid doing
2465 * extra calls into the filesystem if that isn't necessary (e.g.
2466 * during mount that would help a bit). Having relative timestamps
2467 * is not so great if request processing is slow, while absolute
2468 * timestamps are not ideal because they need time synchronization. */
2469 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2473 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2475 ptlrpc_request_free(req);
2478 ptlrpc_request_set_replen(req);
2479 req->rq_request_portal = OST_CREATE_PORTAL;
2480 ptlrpc_at_set_req_timeout(req);
2482 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2483 /* procfs requests not want stat in wait for avoid deadlock */
2484 req->rq_no_resend = 1;
2485 req->rq_no_delay = 1;
2488 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2489 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2490 aa = ptlrpc_req_async_args(req);
2493 ptlrpc_set_add_req(rqset, req);
2497 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2498 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2500 struct obd_device *obd = class_exp2obd(exp);
2501 struct obd_statfs *msfs;
2502 struct ptlrpc_request *req;
2503 struct obd_import *imp = NULL;
2507 /*Since the request might also come from lprocfs, so we need
2508 *sync this with client_disconnect_export Bug15684*/
2509 down_read(&obd->u.cli.cl_sem);
2510 if (obd->u.cli.cl_import)
2511 imp = class_import_get(obd->u.cli.cl_import);
2512 up_read(&obd->u.cli.cl_sem);
2516 /* We could possibly pass max_age in the request (as an absolute
2517 * timestamp or a "seconds.usec ago") so the target can avoid doing
2518 * extra calls into the filesystem if that isn't necessary (e.g.
2519 * during mount that would help a bit). Having relative timestamps
2520 * is not so great if request processing is slow, while absolute
2521 * timestamps are not ideal because they need time synchronization. */
2522 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2524 class_import_put(imp);
2529 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2531 ptlrpc_request_free(req);
2534 ptlrpc_request_set_replen(req);
2535 req->rq_request_portal = OST_CREATE_PORTAL;
2536 ptlrpc_at_set_req_timeout(req);
2538 if (flags & OBD_STATFS_NODELAY) {
2539 /* procfs requests not want stat in wait for avoid deadlock */
2540 req->rq_no_resend = 1;
2541 req->rq_no_delay = 1;
2544 rc = ptlrpc_queue_wait(req);
2548 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2550 GOTO(out, rc = -EPROTO);
2557 ptlrpc_req_finished(req);
2561 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2562 void *karg, void __user *uarg)
2564 struct obd_device *obd = exp->exp_obd;
2565 struct obd_ioctl_data *data = karg;
2569 if (!try_module_get(THIS_MODULE)) {
2570 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2571 module_name(THIS_MODULE));
2575 case OBD_IOC_CLIENT_RECOVER:
2576 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2577 data->ioc_inlbuf1, 0);
2581 case IOC_OSC_SET_ACTIVE:
2582 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2585 case OBD_IOC_PING_TARGET:
2586 err = ptlrpc_obd_ping(obd);
2589 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2590 cmd, current_comm());
2591 GOTO(out, err = -ENOTTY);
2594 module_put(THIS_MODULE);
2598 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2599 u32 keylen, void *key,
2600 u32 vallen, void *val,
2601 struct ptlrpc_request_set *set)
2603 struct ptlrpc_request *req;
2604 struct obd_device *obd = exp->exp_obd;
2605 struct obd_import *imp = class_exp2cliimp(exp);
2610 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2612 if (KEY_IS(KEY_CHECKSUM)) {
2613 if (vallen != sizeof(int))
2615 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2619 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2620 sptlrpc_conf_client_adapt(obd);
2624 if (KEY_IS(KEY_FLUSH_CTX)) {
2625 sptlrpc_import_flush_my_ctx(imp);
2629 if (KEY_IS(KEY_CACHE_SET)) {
2630 struct client_obd *cli = &obd->u.cli;
2632 LASSERT(cli->cl_cache == NULL); /* only once */
2633 cli->cl_cache = (struct cl_client_cache *)val;
2634 cl_cache_incref(cli->cl_cache);
2635 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2637 /* add this osc into entity list */
2638 LASSERT(list_empty(&cli->cl_lru_osc));
2639 spin_lock(&cli->cl_cache->ccc_lru_lock);
2640 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2641 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2646 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2647 struct client_obd *cli = &obd->u.cli;
2648 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2649 long target = *(long *)val;
2651 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2656 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2659 /* We pass all other commands directly to OST. Since nobody calls osc
2660 methods directly and everybody is supposed to go through LOV, we
2661 assume lov checked invalid values for us.
2662 The only recognised values so far are evict_by_nid and mds_conn.
2663 Even if something bad goes through, we'd get a -EINVAL from OST
2666 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2667 &RQF_OST_SET_GRANT_INFO :
2672 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2673 RCL_CLIENT, keylen);
2674 if (!KEY_IS(KEY_GRANT_SHRINK))
2675 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2676 RCL_CLIENT, vallen);
2677 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2679 ptlrpc_request_free(req);
2683 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2684 memcpy(tmp, key, keylen);
2685 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2688 memcpy(tmp, val, vallen);
2690 if (KEY_IS(KEY_GRANT_SHRINK)) {
2691 struct osc_grant_args *aa;
2694 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2695 aa = ptlrpc_req_async_args(req);
2698 ptlrpc_req_finished(req);
2701 *oa = ((struct ost_body *)val)->oa;
2703 req->rq_interpret_reply = osc_shrink_grant_interpret;
2706 ptlrpc_request_set_replen(req);
2707 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2708 LASSERT(set != NULL);
2709 ptlrpc_set_add_req(set, req);
2710 ptlrpc_check_set(NULL, set);
2712 ptlrpcd_add_req(req);
2718 static int osc_reconnect(const struct lu_env *env,
2719 struct obd_export *exp, struct obd_device *obd,
2720 struct obd_uuid *cluuid,
2721 struct obd_connect_data *data,
2724 struct client_obd *cli = &obd->u.cli;
2726 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2730 spin_lock(&cli->cl_loi_list_lock);
2731 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2732 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2733 grant += cli->cl_dirty_grant;
2735 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2736 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2737 lost_grant = cli->cl_lost_grant;
2738 cli->cl_lost_grant = 0;
2739 spin_unlock(&cli->cl_loi_list_lock);
2741 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2742 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2743 data->ocd_version, data->ocd_grant, lost_grant);
2749 static int osc_disconnect(struct obd_export *exp)
2751 struct obd_device *obd = class_exp2obd(exp);
2754 rc = client_disconnect_export(exp);
2756 * Initially we put del_shrink_grant before disconnect_export, but it
2757 * causes the following problem if setup (connect) and cleanup
2758 * (disconnect) are tangled together.
2759 * connect p1 disconnect p2
2760 * ptlrpc_connect_import
2761 * ............... class_manual_cleanup
2764 * ptlrpc_connect_interrupt
2766 * add this client to shrink list
2768 * Bang! pinger trigger the shrink.
2769 * So the osc should be disconnected from the shrink list, after we
2770 * are sure the import has been destroyed. BUG18662
2772 if (obd->u.cli.cl_import == NULL)
2773 osc_del_shrink_grant(&obd->u.cli);
2777 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2778 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2780 struct lu_env *env = arg;
2781 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2782 struct ldlm_lock *lock;
2783 struct osc_object *osc = NULL;
2787 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2788 if (lock->l_ast_data != NULL && osc == NULL) {
2789 osc = lock->l_ast_data;
2790 cl_object_get(osc2cl(osc));
2793 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2794 * by the 2nd round of ldlm_namespace_clean() call in
2795 * osc_import_event(). */
2796 ldlm_clear_cleaned(lock);
2801 osc_object_invalidate(env, osc);
2802 cl_object_put(env, osc2cl(osc));
2808 static int osc_import_event(struct obd_device *obd,
2809 struct obd_import *imp,
2810 enum obd_import_event event)
2812 struct client_obd *cli;
2816 LASSERT(imp->imp_obd == obd);
2819 case IMP_EVENT_DISCON: {
2821 spin_lock(&cli->cl_loi_list_lock);
2822 cli->cl_avail_grant = 0;
2823 cli->cl_lost_grant = 0;
2824 spin_unlock(&cli->cl_loi_list_lock);
2827 case IMP_EVENT_INACTIVE: {
2828 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2831 case IMP_EVENT_INVALIDATE: {
2832 struct ldlm_namespace *ns = obd->obd_namespace;
2836 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2838 env = cl_env_get(&refcheck);
2840 osc_io_unplug(env, &obd->u.cli, NULL);
2842 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2843 osc_ldlm_resource_invalidate,
2845 cl_env_put(env, &refcheck);
2847 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2852 case IMP_EVENT_ACTIVE: {
2853 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2856 case IMP_EVENT_OCD: {
2857 struct obd_connect_data *ocd = &imp->imp_connect_data;
2859 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2860 osc_init_grant(&obd->u.cli, ocd);
2863 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2864 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2866 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2869 case IMP_EVENT_DEACTIVATE: {
2870 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2873 case IMP_EVENT_ACTIVATE: {
2874 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2878 CERROR("Unknown import event %d\n", event);
2885 * Determine whether the lock can be canceled before replaying the lock
2886 * during recovery, see bug16774 for detailed information.
2888 * \retval zero the lock can't be canceled
2889 * \retval other ok to cancel
2891 static int osc_cancel_weight(struct ldlm_lock *lock)
2894 * Cancel all unused and granted extent lock.
2896 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2897 lock->l_granted_mode == lock->l_req_mode &&
2898 osc_ldlm_weigh_ast(lock) == 0)
2904 static int brw_queue_work(const struct lu_env *env, void *data)
2906 struct client_obd *cli = data;
2908 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2910 osc_io_unplug(env, cli, NULL);
2914 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2916 struct client_obd *cli = &obd->u.cli;
2917 struct obd_type *type;
2925 rc = ptlrpcd_addref();
2929 rc = client_obd_setup(obd, lcfg);
2931 GOTO(out_ptlrpcd, rc);
2933 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2934 if (IS_ERR(handler))
2935 GOTO(out_client_setup, rc = PTR_ERR(handler));
2936 cli->cl_writeback_work = handler;
2938 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2939 if (IS_ERR(handler))
2940 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2941 cli->cl_lru_work = handler;
2943 rc = osc_quota_setup(obd);
2945 GOTO(out_ptlrpcd_work, rc);
2947 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2949 #ifdef CONFIG_PROC_FS
2950 obd->obd_vars = lprocfs_osc_obd_vars;
2952 /* If this is true then both client (osc) and server (osp) are on the
2953 * same node. The osp layer if loaded first will register the osc proc
2954 * directory. In that case this obd_device will be attached its proc
2955 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2957 type = class_search_type(LUSTRE_OSP_NAME);
2958 if (type && type->typ_procsym) {
2959 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2961 obd->obd_vars, obd);
2962 if (IS_ERR(obd->obd_proc_entry)) {
2963 rc = PTR_ERR(obd->obd_proc_entry);
2964 CERROR("error %d setting up lprocfs for %s\n", rc,
2966 obd->obd_proc_entry = NULL;
2970 rc = lprocfs_obd_setup(obd, false);
2972 /* If the basic OSC proc tree construction succeeded then
2975 lproc_osc_attach_seqstat(obd);
2976 sptlrpc_lprocfs_cliobd_attach(obd);
2977 ptlrpc_lprocfs_register_obd(obd);
2981 * We try to control the total number of requests with a upper limit
2982 * osc_reqpool_maxreqcount. There might be some race which will cause
2983 * over-limit allocation, but it is fine.
2985 req_count = atomic_read(&osc_pool_req_count);
2986 if (req_count < osc_reqpool_maxreqcount) {
2987 adding = cli->cl_max_rpcs_in_flight + 2;
2988 if (req_count + adding > osc_reqpool_maxreqcount)
2989 adding = osc_reqpool_maxreqcount - req_count;
2991 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2992 atomic_add(added, &osc_pool_req_count);
2995 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2996 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2998 spin_lock(&osc_shrink_lock);
2999 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3000 spin_unlock(&osc_shrink_lock);
3005 if (cli->cl_writeback_work != NULL) {
3006 ptlrpcd_destroy_work(cli->cl_writeback_work);
3007 cli->cl_writeback_work = NULL;
3009 if (cli->cl_lru_work != NULL) {
3010 ptlrpcd_destroy_work(cli->cl_lru_work);
3011 cli->cl_lru_work = NULL;
3014 client_obd_cleanup(obd);
3020 static int osc_precleanup(struct obd_device *obd)
3022 struct client_obd *cli = &obd->u.cli;
3026 * for echo client, export may be on zombie list, wait for
3027 * zombie thread to cull it, because cli.cl_import will be
3028 * cleared in client_disconnect_export():
3029 * class_export_destroy() -> obd_cleanup() ->
3030 * echo_device_free() -> echo_client_cleanup() ->
3031 * obd_disconnect() -> osc_disconnect() ->
3032 * client_disconnect_export()
3034 obd_zombie_barrier();
3035 if (cli->cl_writeback_work) {
3036 ptlrpcd_destroy_work(cli->cl_writeback_work);
3037 cli->cl_writeback_work = NULL;
3040 if (cli->cl_lru_work) {
3041 ptlrpcd_destroy_work(cli->cl_lru_work);
3042 cli->cl_lru_work = NULL;
3045 obd_cleanup_client_import(obd);
3046 ptlrpc_lprocfs_unregister_obd(obd);
3047 lprocfs_obd_cleanup(obd);
3051 int osc_cleanup(struct obd_device *obd)
3053 struct client_obd *cli = &obd->u.cli;
3058 spin_lock(&osc_shrink_lock);
3059 list_del(&cli->cl_shrink_list);
3060 spin_unlock(&osc_shrink_lock);
3063 if (cli->cl_cache != NULL) {
3064 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3065 spin_lock(&cli->cl_cache->ccc_lru_lock);
3066 list_del_init(&cli->cl_lru_osc);
3067 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3068 cli->cl_lru_left = NULL;
3069 cl_cache_decref(cli->cl_cache);
3070 cli->cl_cache = NULL;
3073 /* free memory of osc quota cache */
3074 osc_quota_cleanup(obd);
3076 rc = client_obd_cleanup(obd);
3082 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3084 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3085 return rc > 0 ? 0: rc;
3088 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3090 return osc_process_config_base(obd, buf);
3093 static struct obd_ops osc_obd_ops = {
3094 .o_owner = THIS_MODULE,
3095 .o_setup = osc_setup,
3096 .o_precleanup = osc_precleanup,
3097 .o_cleanup = osc_cleanup,
3098 .o_add_conn = client_import_add_conn,
3099 .o_del_conn = client_import_del_conn,
3100 .o_connect = client_connect_import,
3101 .o_reconnect = osc_reconnect,
3102 .o_disconnect = osc_disconnect,
3103 .o_statfs = osc_statfs,
3104 .o_statfs_async = osc_statfs_async,
3105 .o_create = osc_create,
3106 .o_destroy = osc_destroy,
3107 .o_getattr = osc_getattr,
3108 .o_setattr = osc_setattr,
3109 .o_iocontrol = osc_iocontrol,
3110 .o_set_info_async = osc_set_info_async,
3111 .o_import_event = osc_import_event,
3112 .o_process_config = osc_process_config,
3113 .o_quotactl = osc_quotactl,
3116 static struct shrinker *osc_cache_shrinker;
3117 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3118 DEFINE_SPINLOCK(osc_shrink_lock);
3120 #ifndef HAVE_SHRINKER_COUNT
3121 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3123 struct shrink_control scv = {
3124 .nr_to_scan = shrink_param(sc, nr_to_scan),
3125 .gfp_mask = shrink_param(sc, gfp_mask)
3127 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3128 struct shrinker *shrinker = NULL;
3131 (void)osc_cache_shrink_scan(shrinker, &scv);
3133 return osc_cache_shrink_count(shrinker, &scv);
3137 static int __init osc_init(void)
3139 bool enable_proc = true;
3140 struct obd_type *type;
3141 unsigned int reqpool_size;
3142 unsigned int reqsize;
3144 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3145 osc_cache_shrink_count, osc_cache_shrink_scan);
3148 /* print an address of _any_ initialized kernel symbol from this
3149 * module, to allow debugging with gdb that doesn't support data
3150 * symbols from modules.*/
3151 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3153 rc = lu_kmem_init(osc_caches);
3157 type = class_search_type(LUSTRE_OSP_NAME);
3158 if (type != NULL && type->typ_procsym != NULL)
3159 enable_proc = false;
3161 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3162 LUSTRE_OSC_NAME, &osc_device_type);
3166 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3168 /* This is obviously too much memory, only prevent overflow here */
3169 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3170 GOTO(out_type, rc = -EINVAL);
3172 reqpool_size = osc_reqpool_mem_max << 20;
3175 while (reqsize < OST_IO_MAXREQSIZE)
3176 reqsize = reqsize << 1;
3179 * We don't enlarge the request count in OSC pool according to
3180 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3181 * tried after normal allocation failed. So a small OSC pool won't
3182 * cause much performance degression in most of cases.
3184 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3186 atomic_set(&osc_pool_req_count, 0);
3187 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3188 ptlrpc_add_rqs_to_pool);
3190 if (osc_rq_pool != NULL)
3194 class_unregister_type(LUSTRE_OSC_NAME);
3196 lu_kmem_fini(osc_caches);
3201 static void __exit osc_exit(void)
3203 remove_shrinker(osc_cache_shrinker);
3204 class_unregister_type(LUSTRE_OSC_NAME);
3205 lu_kmem_fini(osc_caches);
3206 ptlrpc_free_rq_pool(osc_rq_pool);
3209 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3210 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3211 MODULE_VERSION(LUSTRE_VERSION_STRING);
3212 MODULE_LICENSE("GPL");
3214 module_init(osc_init);
3215 module_exit(osc_exit);