4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
55 struct osc_brw_async_args {
61 struct brw_page **aa_ppga;
62 struct client_obd *aa_cli;
63 struct list_head aa_oaps;
64 struct list_head aa_exts;
65 struct obd_capa *aa_ocapa;
66 struct cl_req *aa_clerq;
69 #define osc_grant_args osc_brw_async_args
71 struct osc_setattr_args {
73 obd_enqueue_update_f sa_upcall;
77 struct osc_fsync_args {
78 struct obd_info *fa_oi;
79 obd_enqueue_update_f fa_upcall;
83 struct osc_enqueue_args {
84 struct obd_export *oa_exp;
88 osc_enqueue_upcall_f oa_upcall;
90 struct ost_lvb *oa_lvb;
91 struct lustre_handle oa_lockh;
92 unsigned int oa_agl:1;
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100 struct ost_body *body, void *capa)
102 struct obd_capa *oc = (struct obd_capa *)capa;
103 struct lustre_capa *c;
108 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
111 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112 DEBUG_CAPA(D_SEC, c, "pack");
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
117 struct ost_body *body;
119 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
122 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
124 osc_pack_capa(req, body, oinfo->oi_capa);
127 void osc_set_capa_size(struct ptlrpc_request *req,
128 const struct req_msg_field *field,
132 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
134 /* it is already calculated as sizeof struct obd_capa */
138 int osc_getattr_interpret(const struct lu_env *env,
139 struct ptlrpc_request *req,
140 struct osc_async_args *aa, int rc)
142 struct ost_body *body;
148 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152 aa->aa_oi->oi_oa, &body->oa);
154 /* This should really be sent by the OST */
155 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
158 CDEBUG(D_INFO, "can't unpack ost_body\n");
160 aa->aa_oi->oi_oa->o_valid = 0;
163 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168 struct obd_info *oinfo)
170 struct ptlrpc_request *req;
171 struct ost_body *body;
175 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
179 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
182 ptlrpc_request_free(req);
186 osc_pack_req_body(req, oinfo);
188 ptlrpc_request_set_replen(req);
190 rc = ptlrpc_queue_wait(req);
194 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196 GOTO(out, rc = -EPROTO);
198 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
202 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
207 ptlrpc_req_finished(req);
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212 struct obd_info *oinfo, struct obd_trans_info *oti)
214 struct ptlrpc_request *req;
215 struct ost_body *body;
219 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
221 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
225 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
228 ptlrpc_request_free(req);
232 osc_pack_req_body(req, oinfo);
234 ptlrpc_request_set_replen(req);
236 rc = ptlrpc_queue_wait(req);
240 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
242 GOTO(out, rc = -EPROTO);
244 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
249 ptlrpc_req_finished(req);
253 static int osc_setattr_interpret(const struct lu_env *env,
254 struct ptlrpc_request *req,
255 struct osc_setattr_args *sa, int rc)
257 struct ost_body *body;
263 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
265 GOTO(out, rc = -EPROTO);
267 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
270 rc = sa->sa_upcall(sa->sa_cookie, rc);
274 int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
275 obd_enqueue_update_f upcall, void *cookie,
276 struct ptlrpc_request_set *rqset)
278 struct ptlrpc_request *req;
279 struct osc_setattr_args *sa;
283 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
287 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
288 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
290 ptlrpc_request_free(req);
294 osc_pack_req_body(req, oinfo);
296 ptlrpc_request_set_replen(req);
298 /* do mds to ost setattr asynchronously */
300 /* Do not wait for response. */
301 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
303 req->rq_interpret_reply =
304 (ptlrpc_interpterer_t)osc_setattr_interpret;
306 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
307 sa = ptlrpc_req_async_args(req);
308 sa->sa_oa = oinfo->oi_oa;
309 sa->sa_upcall = upcall;
310 sa->sa_cookie = cookie;
312 if (rqset == PTLRPCD_SET)
313 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
315 ptlrpc_set_add_req(rqset, req);
321 static int osc_create(const struct lu_env *env, struct obd_export *exp,
322 struct obdo *oa, struct obd_trans_info *oti)
324 struct ptlrpc_request *req;
325 struct ost_body *body;
330 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
331 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
333 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
335 GOTO(out, rc = -ENOMEM);
337 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
339 ptlrpc_request_free(req);
343 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
346 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
348 ptlrpc_request_set_replen(req);
350 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
351 oa->o_flags == OBD_FL_DELORPHAN) {
353 "delorphan from OST integration");
354 /* Don't resend the delorphan req */
355 req->rq_no_resend = req->rq_no_delay = 1;
358 rc = ptlrpc_queue_wait(req);
362 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
364 GOTO(out_req, rc = -EPROTO);
366 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
367 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
369 oa->o_blksize = cli_brw_size(exp->exp_obd);
370 oa->o_valid |= OBD_MD_FLBLKSZ;
373 if (oa->o_valid & OBD_MD_FLCOOKIE) {
374 if (oti->oti_logcookies == NULL)
375 oti->oti_logcookies = &oti->oti_onecookie;
377 *oti->oti_logcookies = oa->o_lcookie;
381 CDEBUG(D_HA, "transno: "LPD64"\n",
382 lustre_msg_get_transno(req->rq_repmsg));
384 ptlrpc_req_finished(req);
389 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
390 obd_enqueue_update_f upcall, void *cookie,
391 struct ptlrpc_request_set *rqset)
393 struct ptlrpc_request *req;
394 struct osc_setattr_args *sa;
395 struct ost_body *body;
399 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
403 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
404 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
406 ptlrpc_request_free(req);
409 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
410 ptlrpc_at_set_req_timeout(req);
412 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
414 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
416 osc_pack_capa(req, body, oinfo->oi_capa);
418 ptlrpc_request_set_replen(req);
420 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
421 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
422 sa = ptlrpc_req_async_args(req);
423 sa->sa_oa = oinfo->oi_oa;
424 sa->sa_upcall = upcall;
425 sa->sa_cookie = cookie;
426 if (rqset == PTLRPCD_SET)
427 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
429 ptlrpc_set_add_req(rqset, req);
434 static int osc_sync_interpret(const struct lu_env *env,
435 struct ptlrpc_request *req,
438 struct osc_fsync_args *fa = arg;
439 struct ost_body *body;
445 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
447 CERROR ("can't unpack ost_body\n");
448 GOTO(out, rc = -EPROTO);
451 *fa->fa_oi->oi_oa = body->oa;
453 rc = fa->fa_upcall(fa->fa_cookie, rc);
457 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
458 obd_enqueue_update_f upcall, void *cookie,
459 struct ptlrpc_request_set *rqset)
461 struct ptlrpc_request *req;
462 struct ost_body *body;
463 struct osc_fsync_args *fa;
467 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
471 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
472 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
474 ptlrpc_request_free(req);
478 /* overload the size and blocks fields in the oa with start/end */
479 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
481 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
483 osc_pack_capa(req, body, oinfo->oi_capa);
485 ptlrpc_request_set_replen(req);
486 req->rq_interpret_reply = osc_sync_interpret;
488 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
489 fa = ptlrpc_req_async_args(req);
491 fa->fa_upcall = upcall;
492 fa->fa_cookie = cookie;
494 if (rqset == PTLRPCD_SET)
495 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
497 ptlrpc_set_add_req(rqset, req);
502 /* Find and cancel locally locks matched by @mode in the resource found by
503 * @objid. Found locks are added into @cancel list. Returns the amount of
504 * locks added to @cancels list. */
505 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
506 struct list_head *cancels,
507 ldlm_mode_t mode, __u64 lock_flags)
509 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
510 struct ldlm_res_id res_id;
511 struct ldlm_resource *res;
515 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
516 * export) but disabled through procfs (flag in NS).
518 * This distinguishes from a case when ELC is not supported originally,
519 * when we still want to cancel locks in advance and just cancel them
520 * locally, without sending any RPC. */
521 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
524 ostid_build_res_name(&oa->o_oi, &res_id);
525 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
529 LDLM_RESOURCE_ADDREF(res);
530 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
531 lock_flags, 0, NULL);
532 LDLM_RESOURCE_DELREF(res);
533 ldlm_resource_putref(res);
537 static int osc_destroy_interpret(const struct lu_env *env,
538 struct ptlrpc_request *req, void *data,
541 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
543 atomic_dec(&cli->cl_destroy_in_flight);
544 wake_up(&cli->cl_destroy_waitq);
548 static int osc_can_send_destroy(struct client_obd *cli)
550 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
551 cli->cl_max_rpcs_in_flight) {
552 /* The destroy request can be sent */
555 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
556 cli->cl_max_rpcs_in_flight) {
558 * The counter has been modified between the two atomic
561 wake_up(&cli->cl_destroy_waitq);
566 /* Destroy requests can be async always on the client, and we don't even really
567 * care about the return code since the client cannot do anything at all about
569 * When the MDS is unlinking a filename, it saves the file objects into a
570 * recovery llog, and these object records are cancelled when the OST reports
571 * they were destroyed and sync'd to disk (i.e. transaction committed).
572 * If the client dies, or the OST is down when the object should be destroyed,
573 * the records are not cancelled, and when the OST reconnects to the MDS next,
574 * it will retrieve the llog unlink logs and then sends the log cancellation
575 * cookies to the MDS after committing destroy transactions. */
576 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
577 struct obdo *oa, struct obd_trans_info *oti)
579 struct client_obd *cli = &exp->exp_obd->u.cli;
580 struct ptlrpc_request *req;
581 struct ost_body *body;
582 struct list_head cancels = LIST_HEAD_INIT(cancels);
587 CDEBUG(D_INFO, "oa NULL\n");
591 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
592 LDLM_FL_DISCARD_DATA);
594 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
596 ldlm_lock_list_put(&cancels, l_bl_ast, count);
600 osc_set_capa_size(req, &RMF_CAPA1, NULL);
601 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
604 ptlrpc_request_free(req);
608 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
609 ptlrpc_at_set_req_timeout(req);
611 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
612 oa->o_lcookie = *oti->oti_logcookies;
613 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
617 ptlrpc_request_set_replen(req);
619 /* If osc_destory is for destroying the unlink orphan,
620 * sent from MDT to OST, which should not be blocked here,
621 * because the process might be triggered by ptlrpcd, and
622 * it is not good to block ptlrpcd thread (b=16006)*/
623 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
624 req->rq_interpret_reply = osc_destroy_interpret;
625 if (!osc_can_send_destroy(cli)) {
626 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
630 * Wait until the number of on-going destroy RPCs drops
631 * under max_rpc_in_flight
633 l_wait_event_exclusive(cli->cl_destroy_waitq,
634 osc_can_send_destroy(cli), &lwi);
638 /* Do not wait for response */
639 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
643 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
646 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
648 LASSERT(!(oa->o_valid & bits));
651 spin_lock(&cli->cl_loi_list_lock);
652 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
653 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
654 cli->cl_dirty_max_pages)) {
655 CERROR("dirty %lu - %lu > dirty_max %lu\n",
656 cli->cl_dirty_pages, cli->cl_dirty_transit,
657 cli->cl_dirty_max_pages);
659 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
660 atomic_long_read(&obd_dirty_transit_pages) >
661 (obd_max_dirty_pages + 1))) {
662 /* The atomic_read() allowing the atomic_inc() are
663 * not covered by a lock thus they may safely race and trip
664 * this CERROR() unless we add in a small fudge factor (+1). */
665 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
666 cli->cl_import->imp_obd->obd_name,
667 atomic_long_read(&obd_dirty_pages),
668 atomic_long_read(&obd_dirty_transit_pages),
669 obd_max_dirty_pages);
671 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
673 CERROR("dirty %lu - dirty_max %lu too big???\n",
674 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
677 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
679 (cli->cl_max_rpcs_in_flight + 1);
680 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
683 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
684 oa->o_dropped = cli->cl_lost_grant;
685 cli->cl_lost_grant = 0;
686 spin_unlock(&cli->cl_loi_list_lock);
687 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
688 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
692 void osc_update_next_shrink(struct client_obd *cli)
694 cli->cl_next_shrink_grant =
695 cfs_time_shift(cli->cl_grant_shrink_interval);
696 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
697 cli->cl_next_shrink_grant);
700 static void __osc_update_grant(struct client_obd *cli, u64 grant)
702 spin_lock(&cli->cl_loi_list_lock);
703 cli->cl_avail_grant += grant;
704 spin_unlock(&cli->cl_loi_list_lock);
707 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
709 if (body->oa.o_valid & OBD_MD_FLGRANT) {
710 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
711 __osc_update_grant(cli, body->oa.o_grant);
715 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
716 u32 keylen, void *key,
717 u32 vallen, void *val,
718 struct ptlrpc_request_set *set);
720 static int osc_shrink_grant_interpret(const struct lu_env *env,
721 struct ptlrpc_request *req,
724 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
725 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
726 struct ost_body *body;
729 __osc_update_grant(cli, oa->o_grant);
733 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
735 osc_update_grant(cli, body);
741 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
743 spin_lock(&cli->cl_loi_list_lock);
744 oa->o_grant = cli->cl_avail_grant / 4;
745 cli->cl_avail_grant -= oa->o_grant;
746 spin_unlock(&cli->cl_loi_list_lock);
747 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
748 oa->o_valid |= OBD_MD_FLFLAGS;
751 oa->o_flags |= OBD_FL_SHRINK_GRANT;
752 osc_update_next_shrink(cli);
755 /* Shrink the current grant, either from some large amount to enough for a
756 * full set of in-flight RPCs, or if we have already shrunk to that limit
757 * then to enough for a single RPC. This avoids keeping more grant than
758 * needed, and avoids shrinking the grant piecemeal. */
759 static int osc_shrink_grant(struct client_obd *cli)
761 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
762 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
764 spin_lock(&cli->cl_loi_list_lock);
765 if (cli->cl_avail_grant <= target_bytes)
766 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
767 spin_unlock(&cli->cl_loi_list_lock);
769 return osc_shrink_grant_to_target(cli, target_bytes);
772 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
775 struct ost_body *body;
778 spin_lock(&cli->cl_loi_list_lock);
779 /* Don't shrink if we are already above or below the desired limit
780 * We don't want to shrink below a single RPC, as that will negatively
781 * impact block allocation and long-term performance. */
782 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
783 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
785 if (target_bytes >= cli->cl_avail_grant) {
786 spin_unlock(&cli->cl_loi_list_lock);
789 spin_unlock(&cli->cl_loi_list_lock);
795 osc_announce_cached(cli, &body->oa, 0);
797 spin_lock(&cli->cl_loi_list_lock);
798 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
799 cli->cl_avail_grant = target_bytes;
800 spin_unlock(&cli->cl_loi_list_lock);
801 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
802 body->oa.o_valid |= OBD_MD_FLFLAGS;
803 body->oa.o_flags = 0;
805 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
806 osc_update_next_shrink(cli);
808 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
809 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
810 sizeof(*body), body, NULL);
812 __osc_update_grant(cli, body->oa.o_grant);
817 static int osc_should_shrink_grant(struct client_obd *client)
819 cfs_time_t time = cfs_time_current();
820 cfs_time_t next_shrink = client->cl_next_shrink_grant;
822 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
823 OBD_CONNECT_GRANT_SHRINK) == 0)
826 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
827 /* Get the current RPC size directly, instead of going via:
828 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
829 * Keep comment here so that it can be found by searching. */
830 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
832 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
833 client->cl_avail_grant > brw_size)
836 osc_update_next_shrink(client);
841 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
843 struct client_obd *client;
845 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
846 if (osc_should_shrink_grant(client))
847 osc_shrink_grant(client);
852 static int osc_add_shrink_grant(struct client_obd *client)
856 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
858 osc_grant_shrink_grant_cb, NULL,
859 &client->cl_grant_shrink_list);
861 CERROR("add grant client %s error %d\n",
862 client->cl_import->imp_obd->obd_name, rc);
865 CDEBUG(D_CACHE, "add grant client %s \n",
866 client->cl_import->imp_obd->obd_name);
867 osc_update_next_shrink(client);
871 static int osc_del_shrink_grant(struct client_obd *client)
873 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
877 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
880 * ocd_grant is the total grant amount we're expect to hold: if we've
881 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
882 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
885 * race is tolerable here: if we're evicted, but imp_state already
886 * left EVICTED state, then cl_dirty_pages must be 0 already.
888 spin_lock(&cli->cl_loi_list_lock);
889 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
890 cli->cl_avail_grant = ocd->ocd_grant;
892 cli->cl_avail_grant = ocd->ocd_grant -
893 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
895 if (cli->cl_avail_grant < 0) {
896 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
897 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
898 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
899 /* workaround for servers which do not have the patch from
901 cli->cl_avail_grant = ocd->ocd_grant;
904 /* determine the appropriate chunk size used by osc_extent. */
905 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
906 spin_unlock(&cli->cl_loi_list_lock);
908 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
909 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
910 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
912 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
913 list_empty(&cli->cl_grant_shrink_list))
914 osc_add_shrink_grant(cli);
917 /* We assume that the reason this OSC got a short read is because it read
918 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
919 * via the LOV, and it _knows_ it's reading inside the file, it's just that
920 * this stripe never got written at or beyond this stripe offset yet. */
921 static void handle_short_read(int nob_read, size_t page_count,
922 struct brw_page **pga)
927 /* skip bytes read OK */
928 while (nob_read > 0) {
929 LASSERT (page_count > 0);
931 if (pga[i]->count > nob_read) {
932 /* EOF inside this page */
933 ptr = kmap(pga[i]->pg) +
934 (pga[i]->off & ~PAGE_MASK);
935 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
942 nob_read -= pga[i]->count;
947 /* zero remaining pages */
948 while (page_count-- > 0) {
949 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
950 memset(ptr, 0, pga[i]->count);
956 static int check_write_rcs(struct ptlrpc_request *req,
957 int requested_nob, int niocount,
958 size_t page_count, struct brw_page **pga)
963 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
964 sizeof(*remote_rcs) *
966 if (remote_rcs == NULL) {
967 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
971 /* return error if any niobuf was in error */
972 for (i = 0; i < niocount; i++) {
973 if ((int)remote_rcs[i] < 0)
974 return(remote_rcs[i]);
976 if (remote_rcs[i] != 0) {
977 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
978 i, remote_rcs[i], req);
983 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
984 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
985 req->rq_bulk->bd_nob_transferred, requested_nob);
992 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
994 if (p1->flag != p2->flag) {
995 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
996 OBD_BRW_SYNC | OBD_BRW_ASYNC |
997 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
999 /* warn if we try to combine flags that we don't know to be
1000 * safe to combine */
1001 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1002 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1003 "report this at https://jira.hpdd.intel.com/\n",
1004 p1->flag, p2->flag);
1009 return (p1->off + p1->count == p2->off);
1012 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1013 struct brw_page **pga, int opc,
1014 cksum_type_t cksum_type)
1018 struct cfs_crypto_hash_desc *hdesc;
1019 unsigned int bufsize;
1021 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1023 LASSERT(pg_count > 0);
1025 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1026 if (IS_ERR(hdesc)) {
1027 CERROR("Unable to initialize checksum hash %s\n",
1028 cfs_crypto_hash_name(cfs_alg));
1029 return PTR_ERR(hdesc);
1032 while (nob > 0 && pg_count > 0) {
1033 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1035 /* corrupt the data before we compute the checksum, to
1036 * simulate an OST->client data error */
1037 if (i == 0 && opc == OST_READ &&
1038 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1039 unsigned char *ptr = kmap(pga[i]->pg);
1040 int off = pga[i]->off & ~PAGE_MASK;
1042 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1045 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1046 pga[i]->off & ~PAGE_MASK,
1048 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1049 (int)(pga[i]->off & ~PAGE_MASK));
1051 nob -= pga[i]->count;
1056 bufsize = sizeof(cksum);
1057 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1059 /* For sending we only compute the wrong checksum instead
1060 * of corrupting the data so it is still correct on a redo */
1061 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1068 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1069 u32 page_count, struct brw_page **pga,
1070 struct ptlrpc_request **reqp, struct obd_capa *ocapa,
1071 int reserve, int resend)
1073 struct ptlrpc_request *req;
1074 struct ptlrpc_bulk_desc *desc;
1075 struct ost_body *body;
1076 struct obd_ioobj *ioobj;
1077 struct niobuf_remote *niobuf;
1078 int niocount, i, requested_nob, opc, rc;
1079 struct osc_brw_async_args *aa;
1080 struct req_capsule *pill;
1081 struct brw_page *pg_prev;
1084 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1085 RETURN(-ENOMEM); /* Recoverable */
1086 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1087 RETURN(-EINVAL); /* Fatal */
1089 if ((cmd & OBD_BRW_WRITE) != 0) {
1091 req = ptlrpc_request_alloc_pool(cli->cl_import,
1092 cli->cl_import->imp_rq_pool,
1093 &RQF_OST_BRW_WRITE);
1096 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1101 for (niocount = i = 1; i < page_count; i++) {
1102 if (!can_merge_pages(pga[i - 1], pga[i]))
1106 pill = &req->rq_pill;
1107 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1109 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1110 niocount * sizeof(*niobuf));
1111 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1113 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1115 ptlrpc_request_free(req);
1118 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1119 ptlrpc_at_set_req_timeout(req);
1120 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1122 req->rq_no_retry_einprogress = 1;
1124 desc = ptlrpc_prep_bulk_imp(req, page_count,
1125 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1126 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1130 GOTO(out, rc = -ENOMEM);
1131 /* NB request now owns desc and will free it when it gets freed */
1133 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1134 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1135 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1136 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1138 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1140 obdo_to_ioobj(oa, ioobj);
1141 ioobj->ioo_bufcnt = niocount;
1142 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1143 * that might be send for this request. The actual number is decided
1144 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1145 * "max - 1" for old client compatibility sending "0", and also so the
1146 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1147 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1148 osc_pack_capa(req, body, ocapa);
1149 LASSERT(page_count > 0);
1151 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1152 struct brw_page *pg = pga[i];
1153 int poff = pg->off & ~PAGE_MASK;
1155 LASSERT(pg->count > 0);
1156 /* make sure there is no gap in the middle of page array */
1157 LASSERTF(page_count == 1 ||
1158 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1159 ergo(i > 0 && i < page_count - 1,
1160 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1161 ergo(i == page_count - 1, poff == 0)),
1162 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1163 i, page_count, pg, pg->off, pg->count);
1164 LASSERTF(i == 0 || pg->off > pg_prev->off,
1165 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1166 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1168 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1169 pg_prev->pg, page_private(pg_prev->pg),
1170 pg_prev->pg->index, pg_prev->off);
1171 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1172 (pg->flag & OBD_BRW_SRVLOCK));
1174 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1175 requested_nob += pg->count;
1177 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1179 niobuf->rnb_len += pg->count;
1181 niobuf->rnb_offset = pg->off;
1182 niobuf->rnb_len = pg->count;
1183 niobuf->rnb_flags = pg->flag;
1188 LASSERTF((void *)(niobuf - niocount) ==
1189 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1190 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1191 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1193 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1195 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1196 body->oa.o_valid |= OBD_MD_FLFLAGS;
1197 body->oa.o_flags = 0;
1199 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1202 if (osc_should_shrink_grant(cli))
1203 osc_shrink_grant_local(cli, &body->oa);
1205 /* size[REQ_REC_OFF] still sizeof (*body) */
1206 if (opc == OST_WRITE) {
1207 if (cli->cl_checksum &&
1208 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1209 /* store cl_cksum_type in a local variable since
1210 * it can be changed via lprocfs */
1211 cksum_type_t cksum_type = cli->cl_cksum_type;
1213 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1214 oa->o_flags &= OBD_FL_LOCAL_MASK;
1215 body->oa.o_flags = 0;
1217 body->oa.o_flags |= cksum_type_pack(cksum_type);
1218 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1219 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1223 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1225 /* save this in 'oa', too, for later checking */
1226 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1227 oa->o_flags |= cksum_type_pack(cksum_type);
1229 /* clear out the checksum flag, in case this is a
1230 * resend but cl_checksum is no longer set. b=11238 */
1231 oa->o_valid &= ~OBD_MD_FLCKSUM;
1233 oa->o_cksum = body->oa.o_cksum;
1234 /* 1 RC per niobuf */
1235 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1236 sizeof(__u32) * niocount);
1238 if (cli->cl_checksum &&
1239 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1240 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1241 body->oa.o_flags = 0;
1242 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1243 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1246 ptlrpc_request_set_replen(req);
1248 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1249 aa = ptlrpc_req_async_args(req);
1251 aa->aa_requested_nob = requested_nob;
1252 aa->aa_nio_count = niocount;
1253 aa->aa_page_count = page_count;
1257 INIT_LIST_HEAD(&aa->aa_oaps);
1258 if (ocapa && reserve)
1259 aa->aa_ocapa = capa_get(ocapa);
1262 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1263 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1264 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1265 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1269 ptlrpc_req_finished(req);
1273 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1274 __u32 client_cksum, __u32 server_cksum, int nob,
1275 size_t page_count, struct brw_page **pga,
1276 cksum_type_t client_cksum_type)
1280 cksum_type_t cksum_type;
1282 if (server_cksum == client_cksum) {
1283 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1287 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1289 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1292 if (cksum_type != client_cksum_type)
1293 msg = "the server did not use the checksum type specified in "
1294 "the original request - likely a protocol problem";
1295 else if (new_cksum == server_cksum)
1296 msg = "changed on the client after we checksummed it - "
1297 "likely false positive due to mmap IO (bug 11742)";
1298 else if (new_cksum == client_cksum)
1299 msg = "changed in transit before arrival at OST";
1301 msg = "changed in transit AND doesn't match the original - "
1302 "likely false positive due to mmap IO (bug 11742)";
1304 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1305 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1306 msg, libcfs_nid2str(peer->nid),
1307 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1308 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1309 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1310 POSTID(&oa->o_oi), pga[0]->off,
1311 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1312 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1313 "client csum now %x\n", client_cksum, client_cksum_type,
1314 server_cksum, cksum_type, new_cksum);
1318 /* Note rc enters this function as number of bytes transferred */
1319 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1321 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1322 const lnet_process_id_t *peer =
1323 &req->rq_import->imp_connection->c_peer;
1324 struct client_obd *cli = aa->aa_cli;
1325 struct ost_body *body;
1326 u32 client_cksum = 0;
1329 if (rc < 0 && rc != -EDQUOT) {
1330 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1334 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1335 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1337 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1341 /* set/clear over quota flag for a uid/gid */
1342 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1343 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1344 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1346 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1347 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1349 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1352 osc_update_grant(cli, body);
1357 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1358 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1360 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1362 CERROR("Unexpected +ve rc %d\n", rc);
1365 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1367 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1370 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1371 check_write_checksum(&body->oa, peer, client_cksum,
1372 body->oa.o_cksum, aa->aa_requested_nob,
1373 aa->aa_page_count, aa->aa_ppga,
1374 cksum_type_unpack(aa->aa_oa->o_flags)))
1377 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1378 aa->aa_page_count, aa->aa_ppga);
1382 /* The rest of this function executes only for OST_READs */
1384 /* if unwrap_bulk failed, return -EAGAIN to retry */
1385 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1387 GOTO(out, rc = -EAGAIN);
1389 if (rc > aa->aa_requested_nob) {
1390 CERROR("Unexpected rc %d (%d requested)\n", rc,
1391 aa->aa_requested_nob);
1395 if (rc != req->rq_bulk->bd_nob_transferred) {
1396 CERROR ("Unexpected rc %d (%d transferred)\n",
1397 rc, req->rq_bulk->bd_nob_transferred);
1401 if (rc < aa->aa_requested_nob)
1402 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1404 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1405 static int cksum_counter;
1406 u32 server_cksum = body->oa.o_cksum;
1409 cksum_type_t cksum_type;
1411 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1412 body->oa.o_flags : 0);
1413 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1414 aa->aa_ppga, OST_READ,
1417 if (peer->nid != req->rq_bulk->bd_sender) {
1419 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1422 if (server_cksum != client_cksum) {
1423 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1424 "%s%s%s inode "DFID" object "DOSTID
1425 " extent ["LPU64"-"LPU64"]\n",
1426 req->rq_import->imp_obd->obd_name,
1427 libcfs_nid2str(peer->nid),
1429 body->oa.o_valid & OBD_MD_FLFID ?
1430 body->oa.o_parent_seq : (__u64)0,
1431 body->oa.o_valid & OBD_MD_FLFID ?
1432 body->oa.o_parent_oid : 0,
1433 body->oa.o_valid & OBD_MD_FLFID ?
1434 body->oa.o_parent_ver : 0,
1435 POSTID(&body->oa.o_oi),
1436 aa->aa_ppga[0]->off,
1437 aa->aa_ppga[aa->aa_page_count-1]->off +
1438 aa->aa_ppga[aa->aa_page_count-1]->count -
1440 CERROR("client %x, server %x, cksum_type %x\n",
1441 client_cksum, server_cksum, cksum_type);
1443 aa->aa_oa->o_cksum = client_cksum;
1447 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1450 } else if (unlikely(client_cksum)) {
1451 static int cksum_missed;
1454 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1455 CERROR("Checksum %u requested from %s but not sent\n",
1456 cksum_missed, libcfs_nid2str(peer->nid));
1462 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1463 aa->aa_oa, &body->oa);
1468 static int osc_brw_redo_request(struct ptlrpc_request *request,
1469 struct osc_brw_async_args *aa, int rc)
1471 struct ptlrpc_request *new_req;
1472 struct osc_brw_async_args *new_aa;
1473 struct osc_async_page *oap;
1476 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1477 "redo for recoverable error %d", rc);
1479 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1480 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1481 aa->aa_cli, aa->aa_oa,
1482 aa->aa_page_count, aa->aa_ppga,
1483 &new_req, aa->aa_ocapa, 0, 1);
1487 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1488 if (oap->oap_request != NULL) {
1489 LASSERTF(request == oap->oap_request,
1490 "request %p != oap_request %p\n",
1491 request, oap->oap_request);
1492 if (oap->oap_interrupted) {
1493 ptlrpc_req_finished(new_req);
1498 /* New request takes over pga and oaps from old request.
1499 * Note that copying a list_head doesn't work, need to move it... */
1501 new_req->rq_interpret_reply = request->rq_interpret_reply;
1502 new_req->rq_async_args = request->rq_async_args;
1503 new_req->rq_commit_cb = request->rq_commit_cb;
1504 /* cap resend delay to the current request timeout, this is similar to
1505 * what ptlrpc does (see after_reply()) */
1506 if (aa->aa_resends > new_req->rq_timeout)
1507 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1509 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1510 new_req->rq_generation_set = 1;
1511 new_req->rq_import_generation = request->rq_import_generation;
1513 new_aa = ptlrpc_req_async_args(new_req);
1515 INIT_LIST_HEAD(&new_aa->aa_oaps);
1516 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1517 INIT_LIST_HEAD(&new_aa->aa_exts);
1518 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1519 new_aa->aa_resends = aa->aa_resends;
1521 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1522 if (oap->oap_request) {
1523 ptlrpc_req_finished(oap->oap_request);
1524 oap->oap_request = ptlrpc_request_addref(new_req);
1528 new_aa->aa_ocapa = aa->aa_ocapa;
1529 aa->aa_ocapa = NULL;
1531 /* XXX: This code will run into problem if we're going to support
1532 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1533 * and wait for all of them to be finished. We should inherit request
1534 * set from old request. */
1535 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1537 DEBUG_REQ(D_INFO, new_req, "new request");
1542 * ugh, we want disk allocation on the target to happen in offset order. we'll
1543 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1544 * fine for our small page arrays and doesn't require allocation. its an
1545 * insertion sort that swaps elements that are strides apart, shrinking the
1546 * stride down until its '1' and the array is sorted.
1548 static void sort_brw_pages(struct brw_page **array, int num)
1551 struct brw_page *tmp;
1555 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1560 for (i = stride ; i < num ; i++) {
1563 while (j >= stride && array[j - stride]->off > tmp->off) {
1564 array[j] = array[j - stride];
1569 } while (stride > 1);
1572 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1574 LASSERT(ppga != NULL);
1575 OBD_FREE(ppga, sizeof(*ppga) * count);
1578 static int brw_interpret(const struct lu_env *env,
1579 struct ptlrpc_request *req, void *data, int rc)
1581 struct osc_brw_async_args *aa = data;
1582 struct osc_extent *ext;
1583 struct osc_extent *tmp;
1584 struct client_obd *cli = aa->aa_cli;
1587 rc = osc_brw_fini_request(req, rc);
1588 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1589 /* When server return -EINPROGRESS, client should always retry
1590 * regardless of the number of times the bulk was resent already. */
1591 if (osc_recoverable_error(rc)) {
1592 if (req->rq_import_generation !=
1593 req->rq_import->imp_generation) {
1594 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1595 ""DOSTID", rc = %d.\n",
1596 req->rq_import->imp_obd->obd_name,
1597 POSTID(&aa->aa_oa->o_oi), rc);
1598 } else if (rc == -EINPROGRESS ||
1599 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1600 rc = osc_brw_redo_request(req, aa, rc);
1602 CERROR("%s: too many resent retries for object: "
1603 ""LPU64":"LPU64", rc = %d.\n",
1604 req->rq_import->imp_obd->obd_name,
1605 POSTID(&aa->aa_oa->o_oi), rc);
1610 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1615 capa_put(aa->aa_ocapa);
1616 aa->aa_ocapa = NULL;
1620 struct obdo *oa = aa->aa_oa;
1621 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1622 unsigned long valid = 0;
1623 struct cl_object *obj;
1624 struct osc_async_page *last;
1626 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1627 obj = osc2cl(last->oap_obj);
1629 cl_object_attr_lock(obj);
1630 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1631 attr->cat_blocks = oa->o_blocks;
1632 valid |= CAT_BLOCKS;
1634 if (oa->o_valid & OBD_MD_FLMTIME) {
1635 attr->cat_mtime = oa->o_mtime;
1638 if (oa->o_valid & OBD_MD_FLATIME) {
1639 attr->cat_atime = oa->o_atime;
1642 if (oa->o_valid & OBD_MD_FLCTIME) {
1643 attr->cat_ctime = oa->o_ctime;
1647 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1648 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1649 loff_t last_off = last->oap_count + last->oap_obj_off +
1652 /* Change file size if this is an out of quota or
1653 * direct IO write and it extends the file size */
1654 if (loi->loi_lvb.lvb_size < last_off) {
1655 attr->cat_size = last_off;
1658 /* Extend KMS if it's not a lockless write */
1659 if (loi->loi_kms < last_off &&
1660 oap2osc_page(last)->ops_srvlock == 0) {
1661 attr->cat_kms = last_off;
1667 cl_object_attr_update(env, obj, attr, valid);
1668 cl_object_attr_unlock(obj);
1670 OBDO_FREE(aa->aa_oa);
1672 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1673 osc_inc_unstable_pages(req);
1675 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1676 list_del_init(&ext->oe_link);
1677 osc_extent_finish(env, ext, 1, rc);
1679 LASSERT(list_empty(&aa->aa_exts));
1680 LASSERT(list_empty(&aa->aa_oaps));
1682 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1683 req->rq_bulk->bd_nob_transferred);
1684 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1685 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1687 spin_lock(&cli->cl_loi_list_lock);
1688 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1689 * is called so we know whether to go to sync BRWs or wait for more
1690 * RPCs to complete */
1691 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1692 cli->cl_w_in_flight--;
1694 cli->cl_r_in_flight--;
1695 osc_wake_cache_waiters(cli);
1696 spin_unlock(&cli->cl_loi_list_lock);
1698 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1702 static void brw_commit(struct ptlrpc_request *req)
1704 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1705 * this called via the rq_commit_cb, I need to ensure
1706 * osc_dec_unstable_pages is still called. Otherwise unstable
1707 * pages may be leaked. */
1708 spin_lock(&req->rq_lock);
1709 if (likely(req->rq_unstable)) {
1710 req->rq_unstable = 0;
1711 spin_unlock(&req->rq_lock);
1713 osc_dec_unstable_pages(req);
1715 req->rq_committed = 1;
1716 spin_unlock(&req->rq_lock);
1721 * Build an RPC by the list of extent @ext_list. The caller must ensure
1722 * that the total pages in this list are NOT over max pages per RPC.
1723 * Extents in the list must be in OES_RPC state.
1725 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1726 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1728 struct ptlrpc_request *req = NULL;
1729 struct osc_extent *ext;
1730 struct brw_page **pga = NULL;
1731 struct osc_brw_async_args *aa = NULL;
1732 struct obdo *oa = NULL;
1733 struct osc_async_page *oap;
1734 struct osc_async_page *tmp;
1735 struct cl_req *clerq = NULL;
1736 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1738 struct cl_req_attr *crattr = NULL;
1739 loff_t starting_offset = OBD_OBJECT_EOF;
1740 loff_t ending_offset = 0;
1744 bool soft_sync = false;
1747 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1748 struct ost_body *body;
1750 LASSERT(!list_empty(ext_list));
1752 /* add pages into rpc_list to build BRW rpc */
1753 list_for_each_entry(ext, ext_list, oe_link) {
1754 LASSERT(ext->oe_state == OES_RPC);
1755 mem_tight |= ext->oe_memalloc;
1756 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1758 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1759 if (starting_offset == OBD_OBJECT_EOF ||
1760 starting_offset > oap->oap_obj_off)
1761 starting_offset = oap->oap_obj_off;
1763 LASSERT(oap->oap_page_off == 0);
1764 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1765 ending_offset = oap->oap_obj_off +
1768 LASSERT(oap->oap_page_off + oap->oap_count ==
1773 soft_sync = osc_over_unstable_soft_limit(cli);
1775 mpflag = cfs_memory_pressure_get_and_set();
1777 OBD_ALLOC(crattr, sizeof(*crattr));
1779 GOTO(out, rc = -ENOMEM);
1781 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1783 GOTO(out, rc = -ENOMEM);
1787 GOTO(out, rc = -ENOMEM);
1790 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1791 struct cl_page *page = oap2cl_page(oap);
1792 if (clerq == NULL) {
1793 clerq = cl_req_alloc(env, page, crt,
1794 1 /* only 1-object rpcs for now */);
1796 GOTO(out, rc = PTR_ERR(clerq));
1799 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1801 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1802 pga[i] = &oap->oap_brw_page;
1803 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1804 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1805 pga[i]->pg, page_index(oap->oap_page), oap,
1808 cl_req_page_add(env, clerq, page);
1811 /* always get the data for the obdo for the rpc */
1812 LASSERT(clerq != NULL);
1813 crattr->cra_oa = oa;
1814 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1816 rc = cl_req_prep(env, clerq);
1818 CERROR("cl_req_prep failed: %d\n", rc);
1822 sort_brw_pages(pga, page_count);
1823 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req,
1824 crattr->cra_capa, 1, 0);
1826 CERROR("prep_req failed: %d\n", rc);
1830 req->rq_commit_cb = brw_commit;
1831 req->rq_interpret_reply = brw_interpret;
1834 req->rq_memalloc = 1;
1836 /* Need to update the timestamps after the request is built in case
1837 * we race with setattr (locally or in queue at OST). If OST gets
1838 * later setattr before earlier BRW (as determined by the request xid),
1839 * the OST will not use BRW timestamps. Sadly, there is no obvious
1840 * way to do this in a single call. bug 10150 */
1841 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1842 crattr->cra_oa = &body->oa;
1843 cl_req_attr_set(env, clerq, crattr,
1844 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1846 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1848 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1849 aa = ptlrpc_req_async_args(req);
1850 INIT_LIST_HEAD(&aa->aa_oaps);
1851 list_splice_init(&rpc_list, &aa->aa_oaps);
1852 INIT_LIST_HEAD(&aa->aa_exts);
1853 list_splice_init(ext_list, &aa->aa_exts);
1854 aa->aa_clerq = clerq;
1856 /* queued sync pages can be torn down while the pages
1857 * were between the pending list and the rpc */
1859 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1860 /* only one oap gets a request reference */
1863 if (oap->oap_interrupted && !req->rq_intr) {
1864 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1866 ptlrpc_mark_interrupted(req);
1870 tmp->oap_request = ptlrpc_request_addref(req);
1872 spin_lock(&cli->cl_loi_list_lock);
1873 starting_offset >>= PAGE_CACHE_SHIFT;
1874 if (cmd == OBD_BRW_READ) {
1875 cli->cl_r_in_flight++;
1876 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1877 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1878 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1879 starting_offset + 1);
1881 cli->cl_w_in_flight++;
1882 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1883 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1884 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1885 starting_offset + 1);
1887 spin_unlock(&cli->cl_loi_list_lock);
1889 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1890 page_count, aa, cli->cl_r_in_flight,
1891 cli->cl_w_in_flight);
1893 /* XXX: Maybe the caller can check the RPC bulk descriptor to
1894 * see which CPU/NUMA node the majority of pages were allocated
1895 * on, and try to assign the async RPC to the CPU core
1896 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1898 * But on the other hand, we expect that multiple ptlrpcd
1899 * threads and the initial write sponsor can run in parallel,
1900 * especially when data checksum is enabled, which is CPU-bound
1901 * operation and single ptlrpcd thread cannot process in time.
1902 * So more ptlrpcd threads sharing BRW load
1903 * (with PDL_POLICY_ROUND) seems better.
1905 ptlrpcd_add_req(req, pol, -1);
1911 cfs_memory_pressure_restore(mpflag);
1913 if (crattr != NULL) {
1914 capa_put(crattr->cra_capa);
1915 OBD_FREE(crattr, sizeof(*crattr));
1919 LASSERT(req == NULL);
1924 OBD_FREE(pga, sizeof(*pga) * page_count);
1925 /* this should happen rarely and is pretty bad, it makes the
1926 * pending list not follow the dirty order */
1927 while (!list_empty(ext_list)) {
1928 ext = list_entry(ext_list->next, struct osc_extent,
1930 list_del_init(&ext->oe_link);
1931 osc_extent_finish(env, ext, 0, rc);
1933 if (clerq && !IS_ERR(clerq))
1934 cl_req_completion(env, clerq, rc);
1939 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1940 struct ldlm_enqueue_info *einfo)
1942 void *data = einfo->ei_cbdata;
1945 LASSERT(lock != NULL);
1946 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1947 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1948 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1949 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1951 lock_res_and_lock(lock);
1953 if (lock->l_ast_data == NULL)
1954 lock->l_ast_data = data;
1955 if (lock->l_ast_data == data)
1958 unlock_res_and_lock(lock);
1963 static int osc_set_data_with_check(struct lustre_handle *lockh,
1964 struct ldlm_enqueue_info *einfo)
1966 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1970 set = osc_set_lock_data_with_check(lock, einfo);
1971 LDLM_LOCK_PUT(lock);
1973 CERROR("lockh %p, data %p - client evicted?\n",
1974 lockh, einfo->ei_cbdata);
1978 static int osc_enqueue_fini(struct ptlrpc_request *req,
1979 osc_enqueue_upcall_f upcall, void *cookie,
1980 struct lustre_handle *lockh, ldlm_mode_t mode,
1981 __u64 *flags, int agl, int errcode)
1983 bool intent = *flags & LDLM_FL_HAS_INTENT;
1987 /* The request was created before ldlm_cli_enqueue call. */
1988 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1989 struct ldlm_reply *rep;
1991 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1992 LASSERT(rep != NULL);
1994 rep->lock_policy_res1 =
1995 ptlrpc_status_ntoh(rep->lock_policy_res1);
1996 if (rep->lock_policy_res1)
1997 errcode = rep->lock_policy_res1;
1999 *flags |= LDLM_FL_LVB_READY;
2000 } else if (errcode == ELDLM_OK) {
2001 *flags |= LDLM_FL_LVB_READY;
2004 /* Call the update callback. */
2005 rc = (*upcall)(cookie, lockh, errcode);
2007 /* release the reference taken in ldlm_cli_enqueue() */
2008 if (errcode == ELDLM_LOCK_MATCHED)
2010 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2011 ldlm_lock_decref(lockh, mode);
2016 static int osc_enqueue_interpret(const struct lu_env *env,
2017 struct ptlrpc_request *req,
2018 struct osc_enqueue_args *aa, int rc)
2020 struct ldlm_lock *lock;
2021 struct lustre_handle *lockh = &aa->oa_lockh;
2022 ldlm_mode_t mode = aa->oa_mode;
2023 struct ost_lvb *lvb = aa->oa_lvb;
2024 __u32 lvb_len = sizeof(*lvb);
2029 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2031 lock = ldlm_handle2lock(lockh);
2032 LASSERTF(lock != NULL,
2033 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2034 lockh->cookie, req, aa);
2036 /* Take an additional reference so that a blocking AST that
2037 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2038 * to arrive after an upcall has been executed by
2039 * osc_enqueue_fini(). */
2040 ldlm_lock_addref(lockh, mode);
2042 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2043 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2045 /* Let CP AST to grant the lock first. */
2046 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2049 LASSERT(aa->oa_lvb == NULL);
2050 LASSERT(aa->oa_flags == NULL);
2051 aa->oa_flags = &flags;
2054 /* Complete obtaining the lock procedure. */
2055 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2056 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2058 /* Complete osc stuff. */
2059 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2060 aa->oa_flags, aa->oa_agl, rc);
2062 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2064 ldlm_lock_decref(lockh, mode);
2065 LDLM_LOCK_PUT(lock);
2069 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2071 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2072 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2073 * other synchronous requests, however keeping some locks and trying to obtain
2074 * others may take a considerable amount of time in a case of ost failure; and
2075 * when other sync requests do not get released lock from a client, the client
2076 * is evicted from the cluster -- such scenarious make the life difficult, so
2077 * release locks just after they are obtained. */
2078 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2079 __u64 *flags, ldlm_policy_data_t *policy,
2080 struct ost_lvb *lvb, int kms_valid,
2081 osc_enqueue_upcall_f upcall, void *cookie,
2082 struct ldlm_enqueue_info *einfo,
2083 struct ptlrpc_request_set *rqset, int async, int agl)
2085 struct obd_device *obd = exp->exp_obd;
2086 struct lustre_handle lockh = { 0 };
2087 struct ptlrpc_request *req = NULL;
2088 int intent = *flags & LDLM_FL_HAS_INTENT;
2089 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2094 /* Filesystem lock extents are extended to page boundaries so that
2095 * dealing with the page cache is a little smoother. */
2096 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2097 policy->l_extent.end |= ~PAGE_MASK;
2100 * kms is not valid when either object is completely fresh (so that no
2101 * locks are cached), or object was evicted. In the latter case cached
2102 * lock cannot be used, because it would prime inode state with
2103 * potentially stale LVB.
2108 /* Next, search for already existing extent locks that will cover us */
2109 /* If we're trying to read, we also search for an existing PW lock. The
2110 * VFS and page cache already protect us locally, so lots of readers/
2111 * writers can share a single PW lock.
2113 * There are problems with conversion deadlocks, so instead of
2114 * converting a read lock to a write lock, we'll just enqueue a new
2117 * At some point we should cancel the read lock instead of making them
2118 * send us a blocking callback, but there are problems with canceling
2119 * locks out from other users right now, too. */
2120 mode = einfo->ei_mode;
2121 if (einfo->ei_mode == LCK_PR)
2123 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2124 einfo->ei_type, policy, mode, &lockh, 0);
2126 struct ldlm_lock *matched;
2128 if (*flags & LDLM_FL_TEST_LOCK)
2131 matched = ldlm_handle2lock(&lockh);
2133 /* AGL enqueues DLM locks speculatively. Therefore if
2134 * it already exists a DLM lock, it wll just inform the
2135 * caller to cancel the AGL process for this stripe. */
2136 ldlm_lock_decref(&lockh, mode);
2137 LDLM_LOCK_PUT(matched);
2139 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2140 *flags |= LDLM_FL_LVB_READY;
2142 /* We already have a lock, and it's referenced. */
2143 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2145 ldlm_lock_decref(&lockh, mode);
2146 LDLM_LOCK_PUT(matched);
2149 ldlm_lock_decref(&lockh, mode);
2150 LDLM_LOCK_PUT(matched);
2155 if (*flags & LDLM_FL_TEST_LOCK)
2159 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2160 &RQF_LDLM_ENQUEUE_LVB);
2164 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2166 ptlrpc_request_free(req);
2170 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2172 ptlrpc_request_set_replen(req);
2175 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2176 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2178 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2179 sizeof(*lvb), LVB_T_OST, &lockh, async);
2182 struct osc_enqueue_args *aa;
2183 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2184 aa = ptlrpc_req_async_args(req);
2186 aa->oa_mode = einfo->ei_mode;
2187 aa->oa_type = einfo->ei_type;
2188 lustre_handle_copy(&aa->oa_lockh, &lockh);
2189 aa->oa_upcall = upcall;
2190 aa->oa_cookie = cookie;
2193 aa->oa_flags = flags;
2196 /* AGL is essentially to enqueue an DLM lock
2197 * in advance, so we don't care about the
2198 * result of AGL enqueue. */
2200 aa->oa_flags = NULL;
2203 req->rq_interpret_reply =
2204 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2205 if (rqset == PTLRPCD_SET)
2206 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2208 ptlrpc_set_add_req(rqset, req);
2209 } else if (intent) {
2210 ptlrpc_req_finished(req);
2215 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2218 ptlrpc_req_finished(req);
2223 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2224 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2225 __u64 *flags, void *data, struct lustre_handle *lockh,
2228 struct obd_device *obd = exp->exp_obd;
2229 __u64 lflags = *flags;
2233 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2236 /* Filesystem lock extents are extended to page boundaries so that
2237 * dealing with the page cache is a little smoother */
2238 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2239 policy->l_extent.end |= ~PAGE_MASK;
2241 /* Next, search for already existing extent locks that will cover us */
2242 /* If we're trying to read, we also search for an existing PW lock. The
2243 * VFS and page cache already protect us locally, so lots of readers/
2244 * writers can share a single PW lock. */
2248 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2249 res_id, type, policy, rc, lockh, unref);
2252 if (!osc_set_data_with_check(lockh, data)) {
2253 if (!(lflags & LDLM_FL_TEST_LOCK))
2254 ldlm_lock_decref(lockh, rc);
2258 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2259 ldlm_lock_addref(lockh, LCK_PR);
2260 ldlm_lock_decref(lockh, LCK_PW);
2267 static int osc_statfs_interpret(const struct lu_env *env,
2268 struct ptlrpc_request *req,
2269 struct osc_async_args *aa, int rc)
2271 struct obd_statfs *msfs;
2275 /* The request has in fact never been sent
2276 * due to issues at a higher level (LOV).
2277 * Exit immediately since the caller is
2278 * aware of the problem and takes care
2279 * of the clean up */
2282 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2283 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2289 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2291 GOTO(out, rc = -EPROTO);
2294 *aa->aa_oi->oi_osfs = *msfs;
2296 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2300 static int osc_statfs_async(struct obd_export *exp,
2301 struct obd_info *oinfo, __u64 max_age,
2302 struct ptlrpc_request_set *rqset)
2304 struct obd_device *obd = class_exp2obd(exp);
2305 struct ptlrpc_request *req;
2306 struct osc_async_args *aa;
2310 /* We could possibly pass max_age in the request (as an absolute
2311 * timestamp or a "seconds.usec ago") so the target can avoid doing
2312 * extra calls into the filesystem if that isn't necessary (e.g.
2313 * during mount that would help a bit). Having relative timestamps
2314 * is not so great if request processing is slow, while absolute
2315 * timestamps are not ideal because they need time synchronization. */
2316 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2322 ptlrpc_request_free(req);
2325 ptlrpc_request_set_replen(req);
2326 req->rq_request_portal = OST_CREATE_PORTAL;
2327 ptlrpc_at_set_req_timeout(req);
2329 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2330 /* procfs requests not want stat in wait for avoid deadlock */
2331 req->rq_no_resend = 1;
2332 req->rq_no_delay = 1;
2335 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2336 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2337 aa = ptlrpc_req_async_args(req);
2340 ptlrpc_set_add_req(rqset, req);
2344 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2345 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2347 struct obd_device *obd = class_exp2obd(exp);
2348 struct obd_statfs *msfs;
2349 struct ptlrpc_request *req;
2350 struct obd_import *imp = NULL;
2354 /*Since the request might also come from lprocfs, so we need
2355 *sync this with client_disconnect_export Bug15684*/
2356 down_read(&obd->u.cli.cl_sem);
2357 if (obd->u.cli.cl_import)
2358 imp = class_import_get(obd->u.cli.cl_import);
2359 up_read(&obd->u.cli.cl_sem);
2363 /* We could possibly pass max_age in the request (as an absolute
2364 * timestamp or a "seconds.usec ago") so the target can avoid doing
2365 * extra calls into the filesystem if that isn't necessary (e.g.
2366 * during mount that would help a bit). Having relative timestamps
2367 * is not so great if request processing is slow, while absolute
2368 * timestamps are not ideal because they need time synchronization. */
2369 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2371 class_import_put(imp);
2376 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2378 ptlrpc_request_free(req);
2381 ptlrpc_request_set_replen(req);
2382 req->rq_request_portal = OST_CREATE_PORTAL;
2383 ptlrpc_at_set_req_timeout(req);
2385 if (flags & OBD_STATFS_NODELAY) {
2386 /* procfs requests not want stat in wait for avoid deadlock */
2387 req->rq_no_resend = 1;
2388 req->rq_no_delay = 1;
2391 rc = ptlrpc_queue_wait(req);
2395 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2397 GOTO(out, rc = -EPROTO);
2404 ptlrpc_req_finished(req);
2408 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2409 void *karg, void *uarg)
2411 struct obd_device *obd = exp->exp_obd;
2412 struct obd_ioctl_data *data = karg;
2416 if (!try_module_get(THIS_MODULE)) {
2417 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2418 module_name(THIS_MODULE));
2422 case OBD_IOC_CLIENT_RECOVER:
2423 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2424 data->ioc_inlbuf1, 0);
2428 case IOC_OSC_SET_ACTIVE:
2429 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2432 case OBD_IOC_POLL_QUOTACHECK:
2433 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2435 case OBD_IOC_PING_TARGET:
2436 err = ptlrpc_obd_ping(obd);
2439 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2440 cmd, current_comm());
2441 GOTO(out, err = -ENOTTY);
2444 module_put(THIS_MODULE);
2448 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2449 u32 keylen, void *key,
2450 u32 vallen, void *val,
2451 struct ptlrpc_request_set *set)
2453 struct ptlrpc_request *req;
2454 struct obd_device *obd = exp->exp_obd;
2455 struct obd_import *imp = class_exp2cliimp(exp);
2460 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2462 if (KEY_IS(KEY_CHECKSUM)) {
2463 if (vallen != sizeof(int))
2465 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2469 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2470 sptlrpc_conf_client_adapt(obd);
2474 if (KEY_IS(KEY_FLUSH_CTX)) {
2475 sptlrpc_import_flush_my_ctx(imp);
2479 if (KEY_IS(KEY_CACHE_SET)) {
2480 struct client_obd *cli = &obd->u.cli;
2482 LASSERT(cli->cl_cache == NULL); /* only once */
2483 cli->cl_cache = (struct cl_client_cache *)val;
2484 cl_cache_incref(cli->cl_cache);
2485 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2487 /* add this osc into entity list */
2488 LASSERT(list_empty(&cli->cl_lru_osc));
2489 spin_lock(&cli->cl_cache->ccc_lru_lock);
2490 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2491 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2496 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2497 struct client_obd *cli = &obd->u.cli;
2498 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2499 long target = *(long *)val;
2501 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2506 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2509 /* We pass all other commands directly to OST. Since nobody calls osc
2510 methods directly and everybody is supposed to go through LOV, we
2511 assume lov checked invalid values for us.
2512 The only recognised values so far are evict_by_nid and mds_conn.
2513 Even if something bad goes through, we'd get a -EINVAL from OST
2516 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2517 &RQF_OST_SET_GRANT_INFO :
2522 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2523 RCL_CLIENT, keylen);
2524 if (!KEY_IS(KEY_GRANT_SHRINK))
2525 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2526 RCL_CLIENT, vallen);
2527 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2529 ptlrpc_request_free(req);
2533 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2534 memcpy(tmp, key, keylen);
2535 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2538 memcpy(tmp, val, vallen);
2540 if (KEY_IS(KEY_GRANT_SHRINK)) {
2541 struct osc_grant_args *aa;
2544 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2545 aa = ptlrpc_req_async_args(req);
2548 ptlrpc_req_finished(req);
2551 *oa = ((struct ost_body *)val)->oa;
2553 req->rq_interpret_reply = osc_shrink_grant_interpret;
2556 ptlrpc_request_set_replen(req);
2557 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2558 LASSERT(set != NULL);
2559 ptlrpc_set_add_req(set, req);
2560 ptlrpc_check_set(NULL, set);
2562 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2567 static int osc_reconnect(const struct lu_env *env,
2568 struct obd_export *exp, struct obd_device *obd,
2569 struct obd_uuid *cluuid,
2570 struct obd_connect_data *data,
2573 struct client_obd *cli = &obd->u.cli;
2575 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2578 spin_lock(&cli->cl_loi_list_lock);
2579 data->ocd_grant = (cli->cl_avail_grant +
2580 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2581 2 * cli_brw_size(obd);
2582 lost_grant = cli->cl_lost_grant;
2583 cli->cl_lost_grant = 0;
2584 spin_unlock(&cli->cl_loi_list_lock);
2586 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2587 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2588 data->ocd_version, data->ocd_grant, lost_grant);
2594 static int osc_disconnect(struct obd_export *exp)
2596 struct obd_device *obd = class_exp2obd(exp);
2599 rc = client_disconnect_export(exp);
2601 * Initially we put del_shrink_grant before disconnect_export, but it
2602 * causes the following problem if setup (connect) and cleanup
2603 * (disconnect) are tangled together.
2604 * connect p1 disconnect p2
2605 * ptlrpc_connect_import
2606 * ............... class_manual_cleanup
2609 * ptlrpc_connect_interrupt
2611 * add this client to shrink list
2613 * Bang! pinger trigger the shrink.
2614 * So the osc should be disconnected from the shrink list, after we
2615 * are sure the import has been destroyed. BUG18662
2617 if (obd->u.cli.cl_import == NULL)
2618 osc_del_shrink_grant(&obd->u.cli);
2622 static int osc_import_event(struct obd_device *obd,
2623 struct obd_import *imp,
2624 enum obd_import_event event)
2626 struct client_obd *cli;
2630 LASSERT(imp->imp_obd == obd);
2633 case IMP_EVENT_DISCON: {
2635 spin_lock(&cli->cl_loi_list_lock);
2636 cli->cl_avail_grant = 0;
2637 cli->cl_lost_grant = 0;
2638 spin_unlock(&cli->cl_loi_list_lock);
2641 case IMP_EVENT_INACTIVE: {
2642 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2645 case IMP_EVENT_INVALIDATE: {
2646 struct ldlm_namespace *ns = obd->obd_namespace;
2650 env = cl_env_get(&refcheck);
2654 /* all pages go to failing rpcs due to the invalid
2656 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2658 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2659 cl_env_put(env, &refcheck);
2664 case IMP_EVENT_ACTIVE: {
2665 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2668 case IMP_EVENT_OCD: {
2669 struct obd_connect_data *ocd = &imp->imp_connect_data;
2671 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2672 osc_init_grant(&obd->u.cli, ocd);
2675 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2676 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2678 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2681 case IMP_EVENT_DEACTIVATE: {
2682 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2685 case IMP_EVENT_ACTIVATE: {
2686 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2690 CERROR("Unknown import event %d\n", event);
2697 * Determine whether the lock can be canceled before replaying the lock
2698 * during recovery, see bug16774 for detailed information.
2700 * \retval zero the lock can't be canceled
2701 * \retval other ok to cancel
2703 static int osc_cancel_weight(struct ldlm_lock *lock)
2706 * Cancel all unused and granted extent lock.
2708 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2709 lock->l_granted_mode == lock->l_req_mode &&
2710 osc_ldlm_weigh_ast(lock) == 0)
2716 static int brw_queue_work(const struct lu_env *env, void *data)
2718 struct client_obd *cli = data;
2720 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2722 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2726 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2728 struct client_obd *cli = &obd->u.cli;
2729 struct obd_type *type;
2734 rc = ptlrpcd_addref();
2738 rc = client_obd_setup(obd, lcfg);
2740 GOTO(out_ptlrpcd, rc);
2742 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2743 if (IS_ERR(handler))
2744 GOTO(out_client_setup, rc = PTR_ERR(handler));
2745 cli->cl_writeback_work = handler;
2747 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2748 if (IS_ERR(handler))
2749 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2750 cli->cl_lru_work = handler;
2752 rc = osc_quota_setup(obd);
2754 GOTO(out_ptlrpcd_work, rc);
2756 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2758 #ifdef CONFIG_PROC_FS
2759 obd->obd_vars = lprocfs_osc_obd_vars;
2761 /* If this is true then both client (osc) and server (osp) are on the
2762 * same node. The osp layer if loaded first will register the osc proc
2763 * directory. In that case this obd_device will be attached its proc
2764 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2765 type = class_search_type(LUSTRE_OSP_NAME);
2766 if (type && type->typ_procsym) {
2767 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2769 obd->obd_vars, obd);
2770 if (IS_ERR(obd->obd_proc_entry)) {
2771 rc = PTR_ERR(obd->obd_proc_entry);
2772 CERROR("error %d setting up lprocfs for %s\n", rc,
2774 obd->obd_proc_entry = NULL;
2777 rc = lprocfs_obd_setup(obd);
2780 /* If the basic OSC proc tree construction succeeded then
2781 * lets do the rest. */
2783 lproc_osc_attach_seqstat(obd);
2784 sptlrpc_lprocfs_cliobd_attach(obd);
2785 ptlrpc_lprocfs_register_obd(obd);
2788 /* We need to allocate a few requests more, because
2789 * brw_interpret tries to create new requests before freeing
2790 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2791 * reserved, but I'm afraid that might be too much wasted RAM
2792 * in fact, so 2 is just my guess and still should work. */
2793 cli->cl_import->imp_rq_pool =
2794 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2796 ptlrpc_add_rqs_to_pool);
2798 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2799 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2803 if (cli->cl_writeback_work != NULL) {
2804 ptlrpcd_destroy_work(cli->cl_writeback_work);
2805 cli->cl_writeback_work = NULL;
2807 if (cli->cl_lru_work != NULL) {
2808 ptlrpcd_destroy_work(cli->cl_lru_work);
2809 cli->cl_lru_work = NULL;
2812 client_obd_cleanup(obd);
2818 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2824 case OBD_CLEANUP_EARLY: {
2825 struct obd_import *imp;
2826 imp = obd->u.cli.cl_import;
2827 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2828 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2829 ptlrpc_deactivate_import(imp);
2830 spin_lock(&imp->imp_lock);
2831 imp->imp_pingable = 0;
2832 spin_unlock(&imp->imp_lock);
2835 case OBD_CLEANUP_EXPORTS: {
2836 struct client_obd *cli = &obd->u.cli;
2838 * for echo client, export may be on zombie list, wait for
2839 * zombie thread to cull it, because cli.cl_import will be
2840 * cleared in client_disconnect_export():
2841 * class_export_destroy() -> obd_cleanup() ->
2842 * echo_device_free() -> echo_client_cleanup() ->
2843 * obd_disconnect() -> osc_disconnect() ->
2844 * client_disconnect_export()
2846 obd_zombie_barrier();
2847 if (cli->cl_writeback_work) {
2848 ptlrpcd_destroy_work(cli->cl_writeback_work);
2849 cli->cl_writeback_work = NULL;
2851 if (cli->cl_lru_work) {
2852 ptlrpcd_destroy_work(cli->cl_lru_work);
2853 cli->cl_lru_work = NULL;
2855 obd_cleanup_client_import(obd);
2856 ptlrpc_lprocfs_unregister_obd(obd);
2857 lprocfs_obd_cleanup(obd);
2864 int osc_cleanup(struct obd_device *obd)
2866 struct client_obd *cli = &obd->u.cli;
2872 if (cli->cl_cache != NULL) {
2873 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2874 spin_lock(&cli->cl_cache->ccc_lru_lock);
2875 list_del_init(&cli->cl_lru_osc);
2876 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2877 cli->cl_lru_left = NULL;
2878 cl_cache_decref(cli->cl_cache);
2879 cli->cl_cache = NULL;
2882 /* free memory of osc quota cache */
2883 osc_quota_cleanup(obd);
2885 rc = client_obd_cleanup(obd);
2891 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2893 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2894 return rc > 0 ? 0: rc;
2897 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2899 return osc_process_config_base(obd, buf);
2902 static struct obd_ops osc_obd_ops = {
2903 .o_owner = THIS_MODULE,
2904 .o_setup = osc_setup,
2905 .o_precleanup = osc_precleanup,
2906 .o_cleanup = osc_cleanup,
2907 .o_add_conn = client_import_add_conn,
2908 .o_del_conn = client_import_del_conn,
2909 .o_connect = client_connect_import,
2910 .o_reconnect = osc_reconnect,
2911 .o_disconnect = osc_disconnect,
2912 .o_statfs = osc_statfs,
2913 .o_statfs_async = osc_statfs_async,
2914 .o_create = osc_create,
2915 .o_destroy = osc_destroy,
2916 .o_getattr = osc_getattr,
2917 .o_setattr = osc_setattr,
2918 .o_iocontrol = osc_iocontrol,
2919 .o_set_info_async = osc_set_info_async,
2920 .o_import_event = osc_import_event,
2921 .o_process_config = osc_process_config,
2922 .o_quotactl = osc_quotactl,
2923 .o_quotacheck = osc_quotacheck,
2926 static int __init osc_init(void)
2928 bool enable_proc = true;
2929 struct obd_type *type;
2933 /* print an address of _any_ initialized kernel symbol from this
2934 * module, to allow debugging with gdb that doesn't support data
2935 * symbols from modules.*/
2936 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2938 rc = lu_kmem_init(osc_caches);
2942 type = class_search_type(LUSTRE_OSP_NAME);
2943 if (type != NULL && type->typ_procsym != NULL)
2944 enable_proc = false;
2946 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2947 LUSTRE_OSC_NAME, &osc_device_type);
2949 lu_kmem_fini(osc_caches);
2956 static void /*__exit*/ osc_exit(void)
2958 class_unregister_type(LUSTRE_OSC_NAME);
2959 lu_kmem_fini(osc_caches);
2962 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2963 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2964 MODULE_VERSION(LUSTRE_VERSION_STRING);
2965 MODULE_LICENSE("GPL");
2967 module_init(osc_init);
2968 module_exit(osc_exit);