Whamcloud - gitweb
31ecfcfe587230cb9fdc36ecc6dea0596f90ccdc
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_setattr_args {
72         struct obdo             *sa_oa;
73         obd_enqueue_update_f     sa_upcall;
74         void                    *sa_cookie;
75 };
76
77 struct osc_fsync_args {
78         struct obd_info *fa_oi;
79         obd_enqueue_update_f     fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100                                  struct ost_body *body, void *capa)
101 {
102         struct obd_capa *oc = (struct obd_capa *)capa;
103         struct lustre_capa *c;
104
105         if (!capa)
106                 return;
107
108         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
109         LASSERT(c);
110         capa_cpy(c, oc);
111         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112         DEBUG_CAPA(D_SEC, c, "pack");
113 }
114
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
116 {
117         struct ost_body *body;
118
119         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
120         LASSERT(body);
121
122         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
123                              oinfo->oi_oa);
124         osc_pack_capa(req, body, oinfo->oi_capa);
125 }
126
127 void osc_set_capa_size(struct ptlrpc_request *req,
128                        const struct req_msg_field *field,
129                        struct obd_capa *oc)
130 {
131         if (oc == NULL)
132                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
133         else
134                 /* it is already calculated as sizeof struct obd_capa */
135                 ;
136 }
137
138 int osc_getattr_interpret(const struct lu_env *env,
139                           struct ptlrpc_request *req,
140                           struct osc_async_args *aa, int rc)
141 {
142         struct ost_body *body;
143         ENTRY;
144
145         if (rc != 0)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body) {
150                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152                                      aa->aa_oi->oi_oa, &body->oa);
153
154                 /* This should really be sent by the OST */
155                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
157         } else {
158                 CDEBUG(D_INFO, "can't unpack ost_body\n");
159                 rc = -EPROTO;
160                 aa->aa_oi->oi_oa->o_valid = 0;
161         }
162 out:
163         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
164         RETURN(rc);
165 }
166
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168                        struct obd_info *oinfo)
169 {
170         struct ptlrpc_request *req;
171         struct ost_body       *body;
172         int                    rc;
173         ENTRY;
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
181         if (rc) {
182                 ptlrpc_request_free(req);
183                 RETURN(rc);
184         }
185
186         osc_pack_req_body(req, oinfo);
187
188         ptlrpc_request_set_replen(req);
189
190         rc = ptlrpc_queue_wait(req);
191         if (rc)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
200                              &body->oa);
201
202         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204
205         EXIT;
206  out:
207         ptlrpc_req_finished(req);
208         return rc;
209 }
210
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212                        struct obd_info *oinfo, struct obd_trans_info *oti)
213 {
214         struct ptlrpc_request *req;
215         struct ost_body       *body;
216         int                    rc;
217         ENTRY;
218
219         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
220
221         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
222         if (req == NULL)
223                 RETURN(-ENOMEM);
224
225         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
227         if (rc) {
228                 ptlrpc_request_free(req);
229                 RETURN(rc);
230         }
231
232         osc_pack_req_body(req, oinfo);
233
234         ptlrpc_request_set_replen(req);
235
236         rc = ptlrpc_queue_wait(req);
237         if (rc)
238                 GOTO(out, rc);
239
240         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
241         if (body == NULL)
242                 GOTO(out, rc = -EPROTO);
243
244         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
245                              &body->oa);
246
247         EXIT;
248 out:
249         ptlrpc_req_finished(req);
250         RETURN(rc);
251 }
252
253 static int osc_setattr_interpret(const struct lu_env *env,
254                                  struct ptlrpc_request *req,
255                                  struct osc_setattr_args *sa, int rc)
256 {
257         struct ost_body *body;
258         ENTRY;
259
260         if (rc != 0)
261                 GOTO(out, rc);
262
263         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
264         if (body == NULL)
265                 GOTO(out, rc = -EPROTO);
266
267         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
268                              &body->oa);
269 out:
270         rc = sa->sa_upcall(sa->sa_cookie, rc);
271         RETURN(rc);
272 }
273
274 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
275                            struct obd_trans_info *oti,
276                            obd_enqueue_update_f upcall, void *cookie,
277                            struct ptlrpc_request_set *rqset)
278 {
279         struct ptlrpc_request   *req;
280         struct osc_setattr_args *sa;
281         int                      rc;
282         ENTRY;
283
284         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
285         if (req == NULL)
286                 RETURN(-ENOMEM);
287
288         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
289         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
290         if (rc) {
291                 ptlrpc_request_free(req);
292                 RETURN(rc);
293         }
294
295         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
296                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
297
298         osc_pack_req_body(req, oinfo);
299
300         ptlrpc_request_set_replen(req);
301
302         /* do mds to ost setattr asynchronously */
303         if (!rqset) {
304                 /* Do not wait for response. */
305                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
306         } else {
307                 req->rq_interpret_reply =
308                         (ptlrpc_interpterer_t)osc_setattr_interpret;
309
310                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
311                 sa = ptlrpc_req_async_args(req);
312                 sa->sa_oa = oinfo->oi_oa;
313                 sa->sa_upcall = upcall;
314                 sa->sa_cookie = cookie;
315
316                 if (rqset == PTLRPCD_SET)
317                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
318                 else
319                         ptlrpc_set_add_req(rqset, req);
320         }
321
322         RETURN(0);
323 }
324
325 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
326                              struct obd_trans_info *oti,
327                              struct ptlrpc_request_set *rqset)
328 {
329         return osc_setattr_async_base(exp, oinfo, oti,
330                                       oinfo->oi_cb_up, oinfo, rqset);
331 }
332
333 static int osc_create(const struct lu_env *env, struct obd_export *exp,
334                       struct obdo *oa, struct obd_trans_info *oti)
335 {
336         struct ptlrpc_request *req;
337         struct ost_body       *body;
338         int                    rc;
339         ENTRY;
340
341         LASSERT(oa != NULL);
342         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
343         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
344
345         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
346         if (req == NULL)
347                 GOTO(out, rc = -ENOMEM);
348
349         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
350         if (rc) {
351                 ptlrpc_request_free(req);
352                 GOTO(out, rc);
353         }
354
355         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
356         LASSERT(body);
357
358         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
359
360         ptlrpc_request_set_replen(req);
361
362         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
363             oa->o_flags == OBD_FL_DELORPHAN) {
364                 DEBUG_REQ(D_HA, req,
365                           "delorphan from OST integration");
366                 /* Don't resend the delorphan req */
367                 req->rq_no_resend = req->rq_no_delay = 1;
368         }
369
370         rc = ptlrpc_queue_wait(req);
371         if (rc)
372                 GOTO(out_req, rc);
373
374         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
375         if (body == NULL)
376                 GOTO(out_req, rc = -EPROTO);
377
378         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
379         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
380
381         oa->o_blksize = cli_brw_size(exp->exp_obd);
382         oa->o_valid |= OBD_MD_FLBLKSZ;
383
384         if (oti != NULL) {
385                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
386                         if (oti->oti_logcookies == NULL)
387                                 oti->oti_logcookies = &oti->oti_onecookie;
388
389                         *oti->oti_logcookies = oa->o_lcookie;
390                 }
391         }
392
393         CDEBUG(D_HA, "transno: "LPD64"\n",
394                lustre_msg_get_transno(req->rq_repmsg));
395 out_req:
396         ptlrpc_req_finished(req);
397 out:
398         RETURN(rc);
399 }
400
401 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
402                    obd_enqueue_update_f upcall, void *cookie,
403                    struct ptlrpc_request_set *rqset)
404 {
405         struct ptlrpc_request   *req;
406         struct osc_setattr_args *sa;
407         struct ost_body         *body;
408         int                      rc;
409         ENTRY;
410
411         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
412         if (req == NULL)
413                 RETURN(-ENOMEM);
414
415         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
416         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
417         if (rc) {
418                 ptlrpc_request_free(req);
419                 RETURN(rc);
420         }
421         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
422         ptlrpc_at_set_req_timeout(req);
423
424         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
425         LASSERT(body);
426         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
427                              oinfo->oi_oa);
428         osc_pack_capa(req, body, oinfo->oi_capa);
429
430         ptlrpc_request_set_replen(req);
431
432         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
433         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
434         sa = ptlrpc_req_async_args(req);
435         sa->sa_oa     = oinfo->oi_oa;
436         sa->sa_upcall = upcall;
437         sa->sa_cookie = cookie;
438         if (rqset == PTLRPCD_SET)
439                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
440         else
441                 ptlrpc_set_add_req(rqset, req);
442
443         RETURN(0);
444 }
445
446 static int osc_sync_interpret(const struct lu_env *env,
447                               struct ptlrpc_request *req,
448                               void *arg, int rc)
449 {
450         struct osc_fsync_args *fa = arg;
451         struct ost_body *body;
452         ENTRY;
453
454         if (rc)
455                 GOTO(out, rc);
456
457         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
458         if (body == NULL) {
459                 CERROR ("can't unpack ost_body\n");
460                 GOTO(out, rc = -EPROTO);
461         }
462
463         *fa->fa_oi->oi_oa = body->oa;
464 out:
465         rc = fa->fa_upcall(fa->fa_cookie, rc);
466         RETURN(rc);
467 }
468
469 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
470                   obd_enqueue_update_f upcall, void *cookie,
471                   struct ptlrpc_request_set *rqset)
472 {
473         struct ptlrpc_request *req;
474         struct ost_body       *body;
475         struct osc_fsync_args *fa;
476         int                    rc;
477         ENTRY;
478
479         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
480         if (req == NULL)
481                 RETURN(-ENOMEM);
482
483         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
484         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
485         if (rc) {
486                 ptlrpc_request_free(req);
487                 RETURN(rc);
488         }
489
490         /* overload the size and blocks fields in the oa with start/end */
491         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
492         LASSERT(body);
493         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
494                              oinfo->oi_oa);
495         osc_pack_capa(req, body, oinfo->oi_capa);
496
497         ptlrpc_request_set_replen(req);
498         req->rq_interpret_reply = osc_sync_interpret;
499
500         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
501         fa = ptlrpc_req_async_args(req);
502         fa->fa_oi = oinfo;
503         fa->fa_upcall = upcall;
504         fa->fa_cookie = cookie;
505
506         if (rqset == PTLRPCD_SET)
507                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
508         else
509                 ptlrpc_set_add_req(rqset, req);
510
511         RETURN (0);
512 }
513
514 /* Find and cancel locally locks matched by @mode in the resource found by
515  * @objid. Found locks are added into @cancel list. Returns the amount of
516  * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518                                    struct list_head *cancels,
519                                    ldlm_mode_t mode, __u64 lock_flags)
520 {
521         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522         struct ldlm_res_id res_id;
523         struct ldlm_resource *res;
524         int count;
525         ENTRY;
526
527         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528          * export) but disabled through procfs (flag in NS).
529          *
530          * This distinguishes from a case when ELC is not supported originally,
531          * when we still want to cancel locks in advance and just cancel them
532          * locally, without sending any RPC. */
533         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
534                 RETURN(0);
535
536         ostid_build_res_name(&oa->o_oi, &res_id);
537         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
538         if (IS_ERR(res))
539                 RETURN(0);
540
541         LDLM_RESOURCE_ADDREF(res);
542         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543                                            lock_flags, 0, NULL);
544         LDLM_RESOURCE_DELREF(res);
545         ldlm_resource_putref(res);
546         RETURN(count);
547 }
548
549 static int osc_destroy_interpret(const struct lu_env *env,
550                                  struct ptlrpc_request *req, void *data,
551                                  int rc)
552 {
553         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554
555         atomic_dec(&cli->cl_destroy_in_flight);
556         wake_up(&cli->cl_destroy_waitq);
557         return 0;
558 }
559
560 static int osc_can_send_destroy(struct client_obd *cli)
561 {
562         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563             cli->cl_max_rpcs_in_flight) {
564                 /* The destroy request can be sent */
565                 return 1;
566         }
567         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568             cli->cl_max_rpcs_in_flight) {
569                 /*
570                  * The counter has been modified between the two atomic
571                  * operations.
572                  */
573                 wake_up(&cli->cl_destroy_waitq);
574         }
575         return 0;
576 }
577
578 /* Destroy requests can be async always on the client, and we don't even really
579  * care about the return code since the client cannot do anything at all about
580  * a destroy failure.
581  * When the MDS is unlinking a filename, it saves the file objects into a
582  * recovery llog, and these object records are cancelled when the OST reports
583  * they were destroyed and sync'd to disk (i.e. transaction committed).
584  * If the client dies, or the OST is down when the object should be destroyed,
585  * the records are not cancelled, and when the OST reconnects to the MDS next,
586  * it will retrieve the llog unlink logs and then sends the log cancellation
587  * cookies to the MDS after committing destroy transactions. */
588 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
589                        struct obdo *oa, struct obd_trans_info *oti)
590 {
591         struct client_obd     *cli = &exp->exp_obd->u.cli;
592         struct ptlrpc_request *req;
593         struct ost_body       *body;
594         struct list_head       cancels = LIST_HEAD_INIT(cancels);
595         int rc, count;
596         ENTRY;
597
598         if (!oa) {
599                 CDEBUG(D_INFO, "oa NULL\n");
600                 RETURN(-EINVAL);
601         }
602
603         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
604                                         LDLM_FL_DISCARD_DATA);
605
606         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
607         if (req == NULL) {
608                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
609                 RETURN(-ENOMEM);
610         }
611
612         osc_set_capa_size(req, &RMF_CAPA1, NULL);
613         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
614                                0, &cancels, count);
615         if (rc) {
616                 ptlrpc_request_free(req);
617                 RETURN(rc);
618         }
619
620         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
621         ptlrpc_at_set_req_timeout(req);
622
623         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
624                 oa->o_lcookie = *oti->oti_logcookies;
625         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
626         LASSERT(body);
627         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
628
629         ptlrpc_request_set_replen(req);
630
631         /* If osc_destory is for destroying the unlink orphan,
632          * sent from MDT to OST, which should not be blocked here,
633          * because the process might be triggered by ptlrpcd, and
634          * it is not good to block ptlrpcd thread (b=16006)*/
635         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
636                 req->rq_interpret_reply = osc_destroy_interpret;
637                 if (!osc_can_send_destroy(cli)) {
638                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
639                                                           NULL);
640
641                         /*
642                          * Wait until the number of on-going destroy RPCs drops
643                          * under max_rpc_in_flight
644                          */
645                         l_wait_event_exclusive(cli->cl_destroy_waitq,
646                                                osc_can_send_destroy(cli), &lwi);
647                 }
648         }
649
650         /* Do not wait for response */
651         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
652         RETURN(0);
653 }
654
655 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
656                                 long writing_bytes)
657 {
658         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
659
660         LASSERT(!(oa->o_valid & bits));
661
662         oa->o_valid |= bits;
663         spin_lock(&cli->cl_loi_list_lock);
664         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
665         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
666                      cli->cl_dirty_max_pages)) {
667                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
668                        cli->cl_dirty_pages, cli->cl_dirty_transit,
669                        cli->cl_dirty_max_pages);
670                 oa->o_undirty = 0;
671         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
672                             atomic_long_read(&obd_dirty_transit_pages) >
673                             (obd_max_dirty_pages + 1))) {
674                 /* The atomic_read() allowing the atomic_inc() are
675                  * not covered by a lock thus they may safely race and trip
676                  * this CERROR() unless we add in a small fudge factor (+1). */
677                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
678                        cli->cl_import->imp_obd->obd_name,
679                        atomic_long_read(&obd_dirty_pages),
680                        atomic_long_read(&obd_dirty_transit_pages),
681                        obd_max_dirty_pages);
682                 oa->o_undirty = 0;
683         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
684                             0x7fffffff)) {
685                 CERROR("dirty %lu - dirty_max %lu too big???\n",
686                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
687                 oa->o_undirty = 0;
688         } else {
689                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
690                                       PAGE_CACHE_SHIFT) *
691                                      (cli->cl_max_rpcs_in_flight + 1);
692                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
693                                     max_in_flight);
694         }
695         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
696         oa->o_dropped = cli->cl_lost_grant;
697         cli->cl_lost_grant = 0;
698         spin_unlock(&cli->cl_loi_list_lock);
699         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
700                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
701
702 }
703
704 void osc_update_next_shrink(struct client_obd *cli)
705 {
706         cli->cl_next_shrink_grant =
707                 cfs_time_shift(cli->cl_grant_shrink_interval);
708         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
709                cli->cl_next_shrink_grant);
710 }
711
712 static void __osc_update_grant(struct client_obd *cli, u64 grant)
713 {
714         spin_lock(&cli->cl_loi_list_lock);
715         cli->cl_avail_grant += grant;
716         spin_unlock(&cli->cl_loi_list_lock);
717 }
718
719 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
720 {
721         if (body->oa.o_valid & OBD_MD_FLGRANT) {
722                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
723                 __osc_update_grant(cli, body->oa.o_grant);
724         }
725 }
726
727 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
728                               u32 keylen, void *key,
729                               u32 vallen, void *val,
730                               struct ptlrpc_request_set *set);
731
732 static int osc_shrink_grant_interpret(const struct lu_env *env,
733                                       struct ptlrpc_request *req,
734                                       void *aa, int rc)
735 {
736         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
737         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
738         struct ost_body *body;
739
740         if (rc != 0) {
741                 __osc_update_grant(cli, oa->o_grant);
742                 GOTO(out, rc);
743         }
744
745         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
746         LASSERT(body);
747         osc_update_grant(cli, body);
748 out:
749         OBDO_FREE(oa);
750         return rc;
751 }
752
753 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
754 {
755         spin_lock(&cli->cl_loi_list_lock);
756         oa->o_grant = cli->cl_avail_grant / 4;
757         cli->cl_avail_grant -= oa->o_grant;
758         spin_unlock(&cli->cl_loi_list_lock);
759         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
760                 oa->o_valid |= OBD_MD_FLFLAGS;
761                 oa->o_flags = 0;
762         }
763         oa->o_flags |= OBD_FL_SHRINK_GRANT;
764         osc_update_next_shrink(cli);
765 }
766
767 /* Shrink the current grant, either from some large amount to enough for a
768  * full set of in-flight RPCs, or if we have already shrunk to that limit
769  * then to enough for a single RPC.  This avoids keeping more grant than
770  * needed, and avoids shrinking the grant piecemeal. */
771 static int osc_shrink_grant(struct client_obd *cli)
772 {
773         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
774                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
775
776         spin_lock(&cli->cl_loi_list_lock);
777         if (cli->cl_avail_grant <= target_bytes)
778                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
779         spin_unlock(&cli->cl_loi_list_lock);
780
781         return osc_shrink_grant_to_target(cli, target_bytes);
782 }
783
784 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
785 {
786         int                     rc = 0;
787         struct ost_body        *body;
788         ENTRY;
789
790         spin_lock(&cli->cl_loi_list_lock);
791         /* Don't shrink if we are already above or below the desired limit
792          * We don't want to shrink below a single RPC, as that will negatively
793          * impact block allocation and long-term performance. */
794         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
795                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
796
797         if (target_bytes >= cli->cl_avail_grant) {
798                 spin_unlock(&cli->cl_loi_list_lock);
799                 RETURN(0);
800         }
801         spin_unlock(&cli->cl_loi_list_lock);
802
803         OBD_ALLOC_PTR(body);
804         if (!body)
805                 RETURN(-ENOMEM);
806
807         osc_announce_cached(cli, &body->oa, 0);
808
809         spin_lock(&cli->cl_loi_list_lock);
810         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
811         cli->cl_avail_grant = target_bytes;
812         spin_unlock(&cli->cl_loi_list_lock);
813         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
814                 body->oa.o_valid |= OBD_MD_FLFLAGS;
815                 body->oa.o_flags = 0;
816         }
817         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
818         osc_update_next_shrink(cli);
819
820         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
821                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
822                                 sizeof(*body), body, NULL);
823         if (rc != 0)
824                 __osc_update_grant(cli, body->oa.o_grant);
825         OBD_FREE_PTR(body);
826         RETURN(rc);
827 }
828
829 static int osc_should_shrink_grant(struct client_obd *client)
830 {
831         cfs_time_t time = cfs_time_current();
832         cfs_time_t next_shrink = client->cl_next_shrink_grant;
833
834         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
835              OBD_CONNECT_GRANT_SHRINK) == 0)
836                 return 0;
837
838         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
839                 /* Get the current RPC size directly, instead of going via:
840                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
841                  * Keep comment here so that it can be found by searching. */
842                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
843
844                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
845                     client->cl_avail_grant > brw_size)
846                         return 1;
847                 else
848                         osc_update_next_shrink(client);
849         }
850         return 0;
851 }
852
853 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
854 {
855         struct client_obd *client;
856
857         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
858                 if (osc_should_shrink_grant(client))
859                         osc_shrink_grant(client);
860         }
861         return 0;
862 }
863
864 static int osc_add_shrink_grant(struct client_obd *client)
865 {
866         int rc;
867
868         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
869                                        TIMEOUT_GRANT,
870                                        osc_grant_shrink_grant_cb, NULL,
871                                        &client->cl_grant_shrink_list);
872         if (rc) {
873                 CERROR("add grant client %s error %d\n",
874                         client->cl_import->imp_obd->obd_name, rc);
875                 return rc;
876         }
877         CDEBUG(D_CACHE, "add grant client %s \n",
878                client->cl_import->imp_obd->obd_name);
879         osc_update_next_shrink(client);
880         return 0;
881 }
882
883 static int osc_del_shrink_grant(struct client_obd *client)
884 {
885         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
886                                          TIMEOUT_GRANT);
887 }
888
889 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
890 {
891         /*
892          * ocd_grant is the total grant amount we're expect to hold: if we've
893          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
894          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
895          * dirty.
896          *
897          * race is tolerable here: if we're evicted, but imp_state already
898          * left EVICTED state, then cl_dirty_pages must be 0 already.
899          */
900         spin_lock(&cli->cl_loi_list_lock);
901         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
902                 cli->cl_avail_grant = ocd->ocd_grant;
903         else
904                 cli->cl_avail_grant = ocd->ocd_grant -
905                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
906
907         if (cli->cl_avail_grant < 0) {
908                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
909                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
910                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
911                 /* workaround for servers which do not have the patch from
912                  * LU-2679 */
913                 cli->cl_avail_grant = ocd->ocd_grant;
914         }
915
916         /* determine the appropriate chunk size used by osc_extent. */
917         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
918         spin_unlock(&cli->cl_loi_list_lock);
919
920         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
921                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
922                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
923
924         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
925             list_empty(&cli->cl_grant_shrink_list))
926                 osc_add_shrink_grant(cli);
927 }
928
929 /* We assume that the reason this OSC got a short read is because it read
930  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
931  * via the LOV, and it _knows_ it's reading inside the file, it's just that
932  * this stripe never got written at or beyond this stripe offset yet. */
933 static void handle_short_read(int nob_read, size_t page_count,
934                               struct brw_page **pga)
935 {
936         char *ptr;
937         int i = 0;
938
939         /* skip bytes read OK */
940         while (nob_read > 0) {
941                 LASSERT (page_count > 0);
942
943                 if (pga[i]->count > nob_read) {
944                         /* EOF inside this page */
945                         ptr = kmap(pga[i]->pg) +
946                                 (pga[i]->off & ~CFS_PAGE_MASK);
947                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
948                         kunmap(pga[i]->pg);
949                         page_count--;
950                         i++;
951                         break;
952                 }
953
954                 nob_read -= pga[i]->count;
955                 page_count--;
956                 i++;
957         }
958
959         /* zero remaining pages */
960         while (page_count-- > 0) {
961                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
962                 memset(ptr, 0, pga[i]->count);
963                 kunmap(pga[i]->pg);
964                 i++;
965         }
966 }
967
968 static int check_write_rcs(struct ptlrpc_request *req,
969                            int requested_nob, int niocount,
970                            size_t page_count, struct brw_page **pga)
971 {
972         int     i;
973         __u32   *remote_rcs;
974
975         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
976                                                   sizeof(*remote_rcs) *
977                                                   niocount);
978         if (remote_rcs == NULL) {
979                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
980                 return(-EPROTO);
981         }
982
983         /* return error if any niobuf was in error */
984         for (i = 0; i < niocount; i++) {
985                 if ((int)remote_rcs[i] < 0)
986                         return(remote_rcs[i]);
987
988                 if (remote_rcs[i] != 0) {
989                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
990                                 i, remote_rcs[i], req);
991                         return(-EPROTO);
992                 }
993         }
994
995         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
996                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
997                        req->rq_bulk->bd_nob_transferred, requested_nob);
998                 return(-EPROTO);
999         }
1000
1001         return (0);
1002 }
1003
1004 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1005 {
1006         if (p1->flag != p2->flag) {
1007                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1008                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1009                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1010
1011                 /* warn if we try to combine flags that we don't know to be
1012                  * safe to combine */
1013                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1014                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1015                               "report this at https://jira.hpdd.intel.com/\n",
1016                               p1->flag, p2->flag);
1017                 }
1018                 return 0;
1019         }
1020
1021         return (p1->off + p1->count == p2->off);
1022 }
1023
1024 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1025                              struct brw_page **pga, int opc,
1026                              cksum_type_t cksum_type)
1027 {
1028         u32                             cksum;
1029         int                             i = 0;
1030         struct cfs_crypto_hash_desc     *hdesc;
1031         unsigned int                    bufsize;
1032         int                             err;
1033         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1034
1035         LASSERT(pg_count > 0);
1036
1037         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1038         if (IS_ERR(hdesc)) {
1039                 CERROR("Unable to initialize checksum hash %s\n",
1040                        cfs_crypto_hash_name(cfs_alg));
1041                 return PTR_ERR(hdesc);
1042         }
1043
1044         while (nob > 0 && pg_count > 0) {
1045                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1046
1047                 /* corrupt the data before we compute the checksum, to
1048                  * simulate an OST->client data error */
1049                 if (i == 0 && opc == OST_READ &&
1050                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1051                         unsigned char *ptr = kmap(pga[i]->pg);
1052                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1053
1054                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1055                         kunmap(pga[i]->pg);
1056                 }
1057                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1058                                             pga[i]->off & ~CFS_PAGE_MASK,
1059                                             count);
1060                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1061                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1062
1063                 nob -= pga[i]->count;
1064                 pg_count--;
1065                 i++;
1066         }
1067
1068         bufsize = sizeof(cksum);
1069         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1070
1071         /* For sending we only compute the wrong checksum instead
1072          * of corrupting the data so it is still correct on a redo */
1073         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1074                 cksum++;
1075
1076         return cksum;
1077 }
1078
1079 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1080                                 struct lov_stripe_md *lsm, u32 page_count,
1081                                 struct brw_page **pga,
1082                                 struct ptlrpc_request **reqp,
1083                                 struct obd_capa *ocapa, int reserve,
1084                                 int resend)
1085 {
1086         struct ptlrpc_request   *req;
1087         struct ptlrpc_bulk_desc *desc;
1088         struct ost_body         *body;
1089         struct obd_ioobj        *ioobj;
1090         struct niobuf_remote    *niobuf;
1091         int niocount, i, requested_nob, opc, rc;
1092         struct osc_brw_async_args *aa;
1093         struct req_capsule      *pill;
1094         struct brw_page *pg_prev;
1095
1096         ENTRY;
1097         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1098                 RETURN(-ENOMEM); /* Recoverable */
1099         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1100                 RETURN(-EINVAL); /* Fatal */
1101
1102         if ((cmd & OBD_BRW_WRITE) != 0) {
1103                 opc = OST_WRITE;
1104                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1105                                                 cli->cl_import->imp_rq_pool,
1106                                                 &RQF_OST_BRW_WRITE);
1107         } else {
1108                 opc = OST_READ;
1109                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1110         }
1111         if (req == NULL)
1112                 RETURN(-ENOMEM);
1113
1114         for (niocount = i = 1; i < page_count; i++) {
1115                 if (!can_merge_pages(pga[i - 1], pga[i]))
1116                         niocount++;
1117         }
1118
1119         pill = &req->rq_pill;
1120         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1121                              sizeof(*ioobj));
1122         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1123                              niocount * sizeof(*niobuf));
1124         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1125
1126         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1127         if (rc) {
1128                 ptlrpc_request_free(req);
1129                 RETURN(rc);
1130         }
1131         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1132         ptlrpc_at_set_req_timeout(req);
1133         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1134          * retry logic */
1135         req->rq_no_retry_einprogress = 1;
1136
1137         desc = ptlrpc_prep_bulk_imp(req, page_count,
1138                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1139                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1140                 OST_BULK_PORTAL);
1141
1142         if (desc == NULL)
1143                 GOTO(out, rc = -ENOMEM);
1144         /* NB request now owns desc and will free it when it gets freed */
1145
1146         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1147         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1148         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1149         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1150
1151         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1152
1153         obdo_to_ioobj(oa, ioobj);
1154         ioobj->ioo_bufcnt = niocount;
1155         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1156          * that might be send for this request.  The actual number is decided
1157          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1158          * "max - 1" for old client compatibility sending "0", and also so the
1159          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1160         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1161         osc_pack_capa(req, body, ocapa);
1162         LASSERT(page_count > 0);
1163         pg_prev = pga[0];
1164         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1165                 struct brw_page *pg = pga[i];
1166                 int poff = pg->off & ~CFS_PAGE_MASK;
1167
1168                 LASSERT(pg->count > 0);
1169                 /* make sure there is no gap in the middle of page array */
1170                 LASSERTF(page_count == 1 ||
1171                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1172                           ergo(i > 0 && i < page_count - 1,
1173                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1174                           ergo(i == page_count - 1, poff == 0)),
1175                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1176                          i, page_count, pg, pg->off, pg->count);
1177                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1178                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1179                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1180                          i, page_count,
1181                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1182                          pg_prev->pg, page_private(pg_prev->pg),
1183                          pg_prev->pg->index, pg_prev->off);
1184                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1185                         (pg->flag & OBD_BRW_SRVLOCK));
1186
1187                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1188                 requested_nob += pg->count;
1189
1190                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1191                         niobuf--;
1192                         niobuf->rnb_len += pg->count;
1193                 } else {
1194                         niobuf->rnb_offset = pg->off;
1195                         niobuf->rnb_len    = pg->count;
1196                         niobuf->rnb_flags  = pg->flag;
1197                 }
1198                 pg_prev = pg;
1199         }
1200
1201         LASSERTF((void *)(niobuf - niocount) ==
1202                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1203                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1204                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1205
1206         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1207         if (resend) {
1208                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1209                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1210                         body->oa.o_flags = 0;
1211                 }
1212                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1213         }
1214
1215         if (osc_should_shrink_grant(cli))
1216                 osc_shrink_grant_local(cli, &body->oa);
1217
1218         /* size[REQ_REC_OFF] still sizeof (*body) */
1219         if (opc == OST_WRITE) {
1220                 if (cli->cl_checksum &&
1221                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1222                         /* store cl_cksum_type in a local variable since
1223                          * it can be changed via lprocfs */
1224                         cksum_type_t cksum_type = cli->cl_cksum_type;
1225
1226                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1227                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1228                                 body->oa.o_flags = 0;
1229                         }
1230                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1231                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1232                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1233                                                              page_count, pga,
1234                                                              OST_WRITE,
1235                                                              cksum_type);
1236                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1237                                body->oa.o_cksum);
1238                         /* save this in 'oa', too, for later checking */
1239                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1240                         oa->o_flags |= cksum_type_pack(cksum_type);
1241                 } else {
1242                         /* clear out the checksum flag, in case this is a
1243                          * resend but cl_checksum is no longer set. b=11238 */
1244                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1245                 }
1246                 oa->o_cksum = body->oa.o_cksum;
1247                 /* 1 RC per niobuf */
1248                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1249                                      sizeof(__u32) * niocount);
1250         } else {
1251                 if (cli->cl_checksum &&
1252                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1253                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1254                                 body->oa.o_flags = 0;
1255                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1256                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1257                 }
1258         }
1259         ptlrpc_request_set_replen(req);
1260
1261         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1262         aa = ptlrpc_req_async_args(req);
1263         aa->aa_oa = oa;
1264         aa->aa_requested_nob = requested_nob;
1265         aa->aa_nio_count = niocount;
1266         aa->aa_page_count = page_count;
1267         aa->aa_resends = 0;
1268         aa->aa_ppga = pga;
1269         aa->aa_cli = cli;
1270         INIT_LIST_HEAD(&aa->aa_oaps);
1271         if (ocapa && reserve)
1272                 aa->aa_ocapa = capa_get(ocapa);
1273
1274         *reqp = req;
1275         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1276         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1277                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1278                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1279         RETURN(0);
1280
1281  out:
1282         ptlrpc_req_finished(req);
1283         RETURN(rc);
1284 }
1285
1286 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1287                                 __u32 client_cksum, __u32 server_cksum, int nob,
1288                                 size_t page_count, struct brw_page **pga,
1289                                 cksum_type_t client_cksum_type)
1290 {
1291         __u32 new_cksum;
1292         char *msg;
1293         cksum_type_t cksum_type;
1294
1295         if (server_cksum == client_cksum) {
1296                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1297                 return 0;
1298         }
1299
1300         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1301                                        oa->o_flags : 0);
1302         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1303                                       cksum_type);
1304
1305         if (cksum_type != client_cksum_type)
1306                 msg = "the server did not use the checksum type specified in "
1307                       "the original request - likely a protocol problem";
1308         else if (new_cksum == server_cksum)
1309                 msg = "changed on the client after we checksummed it - "
1310                       "likely false positive due to mmap IO (bug 11742)";
1311         else if (new_cksum == client_cksum)
1312                 msg = "changed in transit before arrival at OST";
1313         else
1314                 msg = "changed in transit AND doesn't match the original - "
1315                       "likely false positive due to mmap IO (bug 11742)";
1316
1317         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1318                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1319                            msg, libcfs_nid2str(peer->nid),
1320                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1321                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1322                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1323                            POSTID(&oa->o_oi), pga[0]->off,
1324                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1325         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1326                "client csum now %x\n", client_cksum, client_cksum_type,
1327                server_cksum, cksum_type, new_cksum);
1328         return 1;
1329 }
1330
1331 /* Note rc enters this function as number of bytes transferred */
1332 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1333 {
1334         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1335         const lnet_process_id_t *peer =
1336                         &req->rq_import->imp_connection->c_peer;
1337         struct client_obd *cli = aa->aa_cli;
1338         struct ost_body *body;
1339         u32 client_cksum = 0;
1340         ENTRY;
1341
1342         if (rc < 0 && rc != -EDQUOT) {
1343                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1344                 RETURN(rc);
1345         }
1346
1347         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1348         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1349         if (body == NULL) {
1350                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1351                 RETURN(-EPROTO);
1352         }
1353
1354         /* set/clear over quota flag for a uid/gid */
1355         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1356             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1357                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1358
1359                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1360                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1361                        body->oa.o_flags);
1362                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1363         }
1364
1365         osc_update_grant(cli, body);
1366
1367         if (rc < 0)
1368                 RETURN(rc);
1369
1370         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1371                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1372
1373         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1374                 if (rc > 0) {
1375                         CERROR("Unexpected +ve rc %d\n", rc);
1376                         RETURN(-EPROTO);
1377                 }
1378                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1379
1380                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1381                         RETURN(-EAGAIN);
1382
1383                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1384                     check_write_checksum(&body->oa, peer, client_cksum,
1385                                          body->oa.o_cksum, aa->aa_requested_nob,
1386                                          aa->aa_page_count, aa->aa_ppga,
1387                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1388                         RETURN(-EAGAIN);
1389
1390                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1391                                      aa->aa_page_count, aa->aa_ppga);
1392                 GOTO(out, rc);
1393         }
1394
1395         /* The rest of this function executes only for OST_READs */
1396
1397         /* if unwrap_bulk failed, return -EAGAIN to retry */
1398         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1399         if (rc < 0)
1400                 GOTO(out, rc = -EAGAIN);
1401
1402         if (rc > aa->aa_requested_nob) {
1403                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1404                        aa->aa_requested_nob);
1405                 RETURN(-EPROTO);
1406         }
1407
1408         if (rc != req->rq_bulk->bd_nob_transferred) {
1409                 CERROR ("Unexpected rc %d (%d transferred)\n",
1410                         rc, req->rq_bulk->bd_nob_transferred);
1411                 return (-EPROTO);
1412         }
1413
1414         if (rc < aa->aa_requested_nob)
1415                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1416
1417         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1418                 static int cksum_counter;
1419                 u32        server_cksum = body->oa.o_cksum;
1420                 char      *via = "";
1421                 char      *router = "";
1422                 cksum_type_t cksum_type;
1423
1424                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1425                                                body->oa.o_flags : 0);
1426                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1427                                                  aa->aa_ppga, OST_READ,
1428                                                  cksum_type);
1429
1430                 if (peer->nid != req->rq_bulk->bd_sender) {
1431                         via = " via ";
1432                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1433                 }
1434
1435                 if (server_cksum != client_cksum) {
1436                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1437                                            "%s%s%s inode "DFID" object "DOSTID
1438                                            " extent ["LPU64"-"LPU64"]\n",
1439                                            req->rq_import->imp_obd->obd_name,
1440                                            libcfs_nid2str(peer->nid),
1441                                            via, router,
1442                                            body->oa.o_valid & OBD_MD_FLFID ?
1443                                                 body->oa.o_parent_seq : (__u64)0,
1444                                            body->oa.o_valid & OBD_MD_FLFID ?
1445                                                 body->oa.o_parent_oid : 0,
1446                                            body->oa.o_valid & OBD_MD_FLFID ?
1447                                                 body->oa.o_parent_ver : 0,
1448                                            POSTID(&body->oa.o_oi),
1449                                            aa->aa_ppga[0]->off,
1450                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1451                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1452                                                                         1);
1453                         CERROR("client %x, server %x, cksum_type %x\n",
1454                                client_cksum, server_cksum, cksum_type);
1455                         cksum_counter = 0;
1456                         aa->aa_oa->o_cksum = client_cksum;
1457                         rc = -EAGAIN;
1458                 } else {
1459                         cksum_counter++;
1460                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1461                         rc = 0;
1462                 }
1463         } else if (unlikely(client_cksum)) {
1464                 static int cksum_missed;
1465
1466                 cksum_missed++;
1467                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1468                         CERROR("Checksum %u requested from %s but not sent\n",
1469                                cksum_missed, libcfs_nid2str(peer->nid));
1470         } else {
1471                 rc = 0;
1472         }
1473 out:
1474         if (rc >= 0)
1475                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1476                                      aa->aa_oa, &body->oa);
1477
1478         RETURN(rc);
1479 }
1480
1481 static int osc_brw_redo_request(struct ptlrpc_request *request,
1482                                 struct osc_brw_async_args *aa, int rc)
1483 {
1484         struct ptlrpc_request *new_req;
1485         struct osc_brw_async_args *new_aa;
1486         struct osc_async_page *oap;
1487         ENTRY;
1488
1489         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1490                   "redo for recoverable error %d", rc);
1491
1492         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1493                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1494                                   aa->aa_cli, aa->aa_oa,
1495                                   NULL /* lsm unused by osc currently */,
1496                                   aa->aa_page_count, aa->aa_ppga,
1497                                   &new_req, aa->aa_ocapa, 0, 1);
1498         if (rc)
1499                 RETURN(rc);
1500
1501         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1502                 if (oap->oap_request != NULL) {
1503                         LASSERTF(request == oap->oap_request,
1504                                  "request %p != oap_request %p\n",
1505                                  request, oap->oap_request);
1506                         if (oap->oap_interrupted) {
1507                                 ptlrpc_req_finished(new_req);
1508                                 RETURN(-EINTR);
1509                         }
1510                 }
1511         }
1512         /* New request takes over pga and oaps from old request.
1513          * Note that copying a list_head doesn't work, need to move it... */
1514         aa->aa_resends++;
1515         new_req->rq_interpret_reply = request->rq_interpret_reply;
1516         new_req->rq_async_args = request->rq_async_args;
1517         new_req->rq_commit_cb = request->rq_commit_cb;
1518         /* cap resend delay to the current request timeout, this is similar to
1519          * what ptlrpc does (see after_reply()) */
1520         if (aa->aa_resends > new_req->rq_timeout)
1521                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1522         else
1523                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1524         new_req->rq_generation_set = 1;
1525         new_req->rq_import_generation = request->rq_import_generation;
1526
1527         new_aa = ptlrpc_req_async_args(new_req);
1528
1529         INIT_LIST_HEAD(&new_aa->aa_oaps);
1530         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1531         INIT_LIST_HEAD(&new_aa->aa_exts);
1532         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1533         new_aa->aa_resends = aa->aa_resends;
1534
1535         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1536                 if (oap->oap_request) {
1537                         ptlrpc_req_finished(oap->oap_request);
1538                         oap->oap_request = ptlrpc_request_addref(new_req);
1539                 }
1540         }
1541
1542         new_aa->aa_ocapa = aa->aa_ocapa;
1543         aa->aa_ocapa = NULL;
1544
1545         /* XXX: This code will run into problem if we're going to support
1546          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1547          * and wait for all of them to be finished. We should inherit request
1548          * set from old request. */
1549         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1550
1551         DEBUG_REQ(D_INFO, new_req, "new request");
1552         RETURN(0);
1553 }
1554
1555 /*
1556  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1557  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1558  * fine for our small page arrays and doesn't require allocation.  its an
1559  * insertion sort that swaps elements that are strides apart, shrinking the
1560  * stride down until its '1' and the array is sorted.
1561  */
1562 static void sort_brw_pages(struct brw_page **array, int num)
1563 {
1564         int stride, i, j;
1565         struct brw_page *tmp;
1566
1567         if (num == 1)
1568                 return;
1569         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1570                 ;
1571
1572         do {
1573                 stride /= 3;
1574                 for (i = stride ; i < num ; i++) {
1575                         tmp = array[i];
1576                         j = i;
1577                         while (j >= stride && array[j - stride]->off > tmp->off) {
1578                                 array[j] = array[j - stride];
1579                                 j -= stride;
1580                         }
1581                         array[j] = tmp;
1582                 }
1583         } while (stride > 1);
1584 }
1585
1586 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1587 {
1588         LASSERT(ppga != NULL);
1589         OBD_FREE(ppga, sizeof(*ppga) * count);
1590 }
1591
1592 static int brw_interpret(const struct lu_env *env,
1593                          struct ptlrpc_request *req, void *data, int rc)
1594 {
1595         struct osc_brw_async_args *aa = data;
1596         struct osc_extent *ext;
1597         struct osc_extent *tmp;
1598         struct client_obd *cli = aa->aa_cli;
1599         ENTRY;
1600
1601         rc = osc_brw_fini_request(req, rc);
1602         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1603         /* When server return -EINPROGRESS, client should always retry
1604          * regardless of the number of times the bulk was resent already. */
1605         if (osc_recoverable_error(rc)) {
1606                 if (req->rq_import_generation !=
1607                     req->rq_import->imp_generation) {
1608                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1609                                ""DOSTID", rc = %d.\n",
1610                                req->rq_import->imp_obd->obd_name,
1611                                POSTID(&aa->aa_oa->o_oi), rc);
1612                 } else if (rc == -EINPROGRESS ||
1613                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1614                         rc = osc_brw_redo_request(req, aa, rc);
1615                 } else {
1616                         CERROR("%s: too many resent retries for object: "
1617                                ""LPU64":"LPU64", rc = %d.\n",
1618                                req->rq_import->imp_obd->obd_name,
1619                                POSTID(&aa->aa_oa->o_oi), rc);
1620                 }
1621
1622                 if (rc == 0)
1623                         RETURN(0);
1624                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1625                         rc = -EIO;
1626         }
1627
1628         if (aa->aa_ocapa) {
1629                 capa_put(aa->aa_ocapa);
1630                 aa->aa_ocapa = NULL;
1631         }
1632
1633         if (rc == 0) {
1634                 struct obdo *oa = aa->aa_oa;
1635                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1636                 unsigned long valid = 0;
1637                 struct cl_object *obj;
1638                 struct osc_async_page *last;
1639
1640                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1641                 obj = osc2cl(last->oap_obj);
1642
1643                 cl_object_attr_lock(obj);
1644                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1645                         attr->cat_blocks = oa->o_blocks;
1646                         valid |= CAT_BLOCKS;
1647                 }
1648                 if (oa->o_valid & OBD_MD_FLMTIME) {
1649                         attr->cat_mtime = oa->o_mtime;
1650                         valid |= CAT_MTIME;
1651                 }
1652                 if (oa->o_valid & OBD_MD_FLATIME) {
1653                         attr->cat_atime = oa->o_atime;
1654                         valid |= CAT_ATIME;
1655                 }
1656                 if (oa->o_valid & OBD_MD_FLCTIME) {
1657                         attr->cat_ctime = oa->o_ctime;
1658                         valid |= CAT_CTIME;
1659                 }
1660
1661                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1662                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1663                         loff_t last_off = last->oap_count + last->oap_obj_off +
1664                                 last->oap_page_off;
1665
1666                         /* Change file size if this is an out of quota or
1667                          * direct IO write and it extends the file size */
1668                         if (loi->loi_lvb.lvb_size < last_off) {
1669                                 attr->cat_size = last_off;
1670                                 valid |= CAT_SIZE;
1671                         }
1672                         /* Extend KMS if it's not a lockless write */
1673                         if (loi->loi_kms < last_off &&
1674                             oap2osc_page(last)->ops_srvlock == 0) {
1675                                 attr->cat_kms = last_off;
1676                                 valid |= CAT_KMS;
1677                         }
1678                 }
1679
1680                 if (valid != 0)
1681                         cl_object_attr_update(env, obj, attr, valid);
1682                 cl_object_attr_unlock(obj);
1683         }
1684         OBDO_FREE(aa->aa_oa);
1685
1686         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1687                 osc_inc_unstable_pages(req);
1688
1689         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1690                 list_del_init(&ext->oe_link);
1691                 osc_extent_finish(env, ext, 1, rc);
1692         }
1693         LASSERT(list_empty(&aa->aa_exts));
1694         LASSERT(list_empty(&aa->aa_oaps));
1695
1696         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1697                           req->rq_bulk->bd_nob_transferred);
1698         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1699         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1700
1701         spin_lock(&cli->cl_loi_list_lock);
1702         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1703          * is called so we know whether to go to sync BRWs or wait for more
1704          * RPCs to complete */
1705         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1706                 cli->cl_w_in_flight--;
1707         else
1708                 cli->cl_r_in_flight--;
1709         osc_wake_cache_waiters(cli);
1710         spin_unlock(&cli->cl_loi_list_lock);
1711
1712         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1713         RETURN(rc);
1714 }
1715
1716 static void brw_commit(struct ptlrpc_request *req)
1717 {
1718         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1719          * this called via the rq_commit_cb, I need to ensure
1720          * osc_dec_unstable_pages is still called. Otherwise unstable
1721          * pages may be leaked. */
1722         spin_lock(&req->rq_lock);
1723         if (likely(req->rq_unstable)) {
1724                 req->rq_unstable = 0;
1725                 spin_unlock(&req->rq_lock);
1726
1727                 osc_dec_unstable_pages(req);
1728         } else {
1729                 req->rq_committed = 1;
1730                 spin_unlock(&req->rq_lock);
1731         }
1732 }
1733
1734 /**
1735  * Build an RPC by the list of extent @ext_list. The caller must ensure
1736  * that the total pages in this list are NOT over max pages per RPC.
1737  * Extents in the list must be in OES_RPC state.
1738  */
1739 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1740                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1741 {
1742         struct ptlrpc_request           *req = NULL;
1743         struct osc_extent               *ext;
1744         struct brw_page                 **pga = NULL;
1745         struct osc_brw_async_args       *aa = NULL;
1746         struct obdo                     *oa = NULL;
1747         struct osc_async_page           *oap;
1748         struct osc_async_page           *tmp;
1749         struct cl_req                   *clerq = NULL;
1750         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1751                                                                       CRT_READ;
1752         struct cl_req_attr              *crattr = NULL;
1753         loff_t                          starting_offset = OBD_OBJECT_EOF;
1754         loff_t                          ending_offset = 0;
1755         int                             mpflag = 0;
1756         int                             mem_tight = 0;
1757         int                             page_count = 0;
1758         bool                            soft_sync = false;
1759         int                             i;
1760         int                             rc;
1761         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1762         struct ost_body                 *body;
1763         ENTRY;
1764         LASSERT(!list_empty(ext_list));
1765
1766         /* add pages into rpc_list to build BRW rpc */
1767         list_for_each_entry(ext, ext_list, oe_link) {
1768                 LASSERT(ext->oe_state == OES_RPC);
1769                 mem_tight |= ext->oe_memalloc;
1770                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1771                         ++page_count;
1772                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1773                         if (starting_offset == OBD_OBJECT_EOF ||
1774                             starting_offset > oap->oap_obj_off)
1775                                 starting_offset = oap->oap_obj_off;
1776                         else
1777                                 LASSERT(oap->oap_page_off == 0);
1778                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1779                                 ending_offset = oap->oap_obj_off +
1780                                                 oap->oap_count;
1781                         else
1782                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1783                                         PAGE_CACHE_SIZE);
1784                 }
1785         }
1786
1787         soft_sync = osc_over_unstable_soft_limit(cli);
1788         if (mem_tight)
1789                 mpflag = cfs_memory_pressure_get_and_set();
1790
1791         OBD_ALLOC(crattr, sizeof(*crattr));
1792         if (crattr == NULL)
1793                 GOTO(out, rc = -ENOMEM);
1794
1795         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1796         if (pga == NULL)
1797                 GOTO(out, rc = -ENOMEM);
1798
1799         OBDO_ALLOC(oa);
1800         if (oa == NULL)
1801                 GOTO(out, rc = -ENOMEM);
1802
1803         i = 0;
1804         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1805                 struct cl_page *page = oap2cl_page(oap);
1806                 if (clerq == NULL) {
1807                         clerq = cl_req_alloc(env, page, crt,
1808                                              1 /* only 1-object rpcs for now */);
1809                         if (IS_ERR(clerq))
1810                                 GOTO(out, rc = PTR_ERR(clerq));
1811                 }
1812                 if (mem_tight)
1813                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1814                 if (soft_sync)
1815                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1816                 pga[i] = &oap->oap_brw_page;
1817                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1818                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1819                        pga[i]->pg, page_index(oap->oap_page), oap,
1820                        pga[i]->flag);
1821                 i++;
1822                 cl_req_page_add(env, clerq, page);
1823         }
1824
1825         /* always get the data for the obdo for the rpc */
1826         LASSERT(clerq != NULL);
1827         crattr->cra_oa = oa;
1828         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1829
1830         rc = cl_req_prep(env, clerq);
1831         if (rc != 0) {
1832                 CERROR("cl_req_prep failed: %d\n", rc);
1833                 GOTO(out, rc);
1834         }
1835
1836         sort_brw_pages(pga, page_count);
1837         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1838                         pga, &req, crattr->cra_capa, 1, 0);
1839         if (rc != 0) {
1840                 CERROR("prep_req failed: %d\n", rc);
1841                 GOTO(out, rc);
1842         }
1843
1844         req->rq_commit_cb = brw_commit;
1845         req->rq_interpret_reply = brw_interpret;
1846
1847         if (mem_tight != 0)
1848                 req->rq_memalloc = 1;
1849
1850         /* Need to update the timestamps after the request is built in case
1851          * we race with setattr (locally or in queue at OST).  If OST gets
1852          * later setattr before earlier BRW (as determined by the request xid),
1853          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1854          * way to do this in a single call.  bug 10150 */
1855         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1856         crattr->cra_oa = &body->oa;
1857         cl_req_attr_set(env, clerq, crattr,
1858                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1859
1860         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1861
1862         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1863         aa = ptlrpc_req_async_args(req);
1864         INIT_LIST_HEAD(&aa->aa_oaps);
1865         list_splice_init(&rpc_list, &aa->aa_oaps);
1866         INIT_LIST_HEAD(&aa->aa_exts);
1867         list_splice_init(ext_list, &aa->aa_exts);
1868         aa->aa_clerq = clerq;
1869
1870         /* queued sync pages can be torn down while the pages
1871          * were between the pending list and the rpc */
1872         tmp = NULL;
1873         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1874                 /* only one oap gets a request reference */
1875                 if (tmp == NULL)
1876                         tmp = oap;
1877                 if (oap->oap_interrupted && !req->rq_intr) {
1878                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1879                                         oap, req);
1880                         ptlrpc_mark_interrupted(req);
1881                 }
1882         }
1883         if (tmp != NULL)
1884                 tmp->oap_request = ptlrpc_request_addref(req);
1885
1886         spin_lock(&cli->cl_loi_list_lock);
1887         starting_offset >>= PAGE_CACHE_SHIFT;
1888         if (cmd == OBD_BRW_READ) {
1889                 cli->cl_r_in_flight++;
1890                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1891                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1892                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1893                                       starting_offset + 1);
1894         } else {
1895                 cli->cl_w_in_flight++;
1896                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1897                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1898                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1899                                       starting_offset + 1);
1900         }
1901         spin_unlock(&cli->cl_loi_list_lock);
1902
1903         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1904                   page_count, aa, cli->cl_r_in_flight,
1905                   cli->cl_w_in_flight);
1906
1907         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1908          * see which CPU/NUMA node the majority of pages were allocated
1909          * on, and try to assign the async RPC to the CPU core
1910          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1911          *
1912          * But on the other hand, we expect that multiple ptlrpcd
1913          * threads and the initial write sponsor can run in parallel,
1914          * especially when data checksum is enabled, which is CPU-bound
1915          * operation and single ptlrpcd thread cannot process in time.
1916          * So more ptlrpcd threads sharing BRW load
1917          * (with PDL_POLICY_ROUND) seems better.
1918          */
1919         ptlrpcd_add_req(req, pol, -1);
1920         rc = 0;
1921         EXIT;
1922
1923 out:
1924         if (mem_tight != 0)
1925                 cfs_memory_pressure_restore(mpflag);
1926
1927         if (crattr != NULL) {
1928                 capa_put(crattr->cra_capa);
1929                 OBD_FREE(crattr, sizeof(*crattr));
1930         }
1931
1932         if (rc != 0) {
1933                 LASSERT(req == NULL);
1934
1935                 if (oa)
1936                         OBDO_FREE(oa);
1937                 if (pga)
1938                         OBD_FREE(pga, sizeof(*pga) * page_count);
1939                 /* this should happen rarely and is pretty bad, it makes the
1940                  * pending list not follow the dirty order */
1941                 while (!list_empty(ext_list)) {
1942                         ext = list_entry(ext_list->next, struct osc_extent,
1943                                          oe_link);
1944                         list_del_init(&ext->oe_link);
1945                         osc_extent_finish(env, ext, 0, rc);
1946                 }
1947                 if (clerq && !IS_ERR(clerq))
1948                         cl_req_completion(env, clerq, rc);
1949         }
1950         RETURN(rc);
1951 }
1952
1953 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1954                                         struct ldlm_enqueue_info *einfo)
1955 {
1956         void *data = einfo->ei_cbdata;
1957         int set = 0;
1958
1959         LASSERT(lock != NULL);
1960         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1961         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1962         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1963         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1964
1965         lock_res_and_lock(lock);
1966
1967         if (lock->l_ast_data == NULL)
1968                 lock->l_ast_data = data;
1969         if (lock->l_ast_data == data)
1970                 set = 1;
1971
1972         unlock_res_and_lock(lock);
1973
1974         return set;
1975 }
1976
1977 static int osc_set_data_with_check(struct lustre_handle *lockh,
1978                                    struct ldlm_enqueue_info *einfo)
1979 {
1980         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1981         int set = 0;
1982
1983         if (lock != NULL) {
1984                 set = osc_set_lock_data_with_check(lock, einfo);
1985                 LDLM_LOCK_PUT(lock);
1986         } else
1987                 CERROR("lockh %p, data %p - client evicted?\n",
1988                        lockh, einfo->ei_cbdata);
1989         return set;
1990 }
1991
1992 static int osc_enqueue_fini(struct ptlrpc_request *req,
1993                             osc_enqueue_upcall_f upcall, void *cookie,
1994                             struct lustre_handle *lockh, ldlm_mode_t mode,
1995                             __u64 *flags, int agl, int errcode)
1996 {
1997         bool intent = *flags & LDLM_FL_HAS_INTENT;
1998         int rc;
1999         ENTRY;
2000
2001         /* The request was created before ldlm_cli_enqueue call. */
2002         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2003                 struct ldlm_reply *rep;
2004
2005                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2006                 LASSERT(rep != NULL);
2007
2008                 rep->lock_policy_res1 =
2009                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2010                 if (rep->lock_policy_res1)
2011                         errcode = rep->lock_policy_res1;
2012                 if (!agl)
2013                         *flags |= LDLM_FL_LVB_READY;
2014         } else if (errcode == ELDLM_OK) {
2015                 *flags |= LDLM_FL_LVB_READY;
2016         }
2017
2018         /* Call the update callback. */
2019         rc = (*upcall)(cookie, lockh, errcode);
2020
2021         /* release the reference taken in ldlm_cli_enqueue() */
2022         if (errcode == ELDLM_LOCK_MATCHED)
2023                 errcode = ELDLM_OK;
2024         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2025                 ldlm_lock_decref(lockh, mode);
2026
2027         RETURN(rc);
2028 }
2029
2030 static int osc_enqueue_interpret(const struct lu_env *env,
2031                                  struct ptlrpc_request *req,
2032                                  struct osc_enqueue_args *aa, int rc)
2033 {
2034         struct ldlm_lock *lock;
2035         struct lustre_handle *lockh = &aa->oa_lockh;
2036         ldlm_mode_t mode = aa->oa_mode;
2037         struct ost_lvb *lvb = aa->oa_lvb;
2038         __u32 lvb_len = sizeof(*lvb);
2039         __u64 flags = 0;
2040
2041         ENTRY;
2042
2043         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2044          * be valid. */
2045         lock = ldlm_handle2lock(lockh);
2046         LASSERTF(lock != NULL,
2047                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2048                  lockh->cookie, req, aa);
2049
2050         /* Take an additional reference so that a blocking AST that
2051          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2052          * to arrive after an upcall has been executed by
2053          * osc_enqueue_fini(). */
2054         ldlm_lock_addref(lockh, mode);
2055
2056         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2057         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2058
2059         /* Let CP AST to grant the lock first. */
2060         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2061
2062         if (aa->oa_agl) {
2063                 LASSERT(aa->oa_lvb == NULL);
2064                 LASSERT(aa->oa_flags == NULL);
2065                 aa->oa_flags = &flags;
2066         }
2067
2068         /* Complete obtaining the lock procedure. */
2069         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2070                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2071                                    lockh, rc);
2072         /* Complete osc stuff. */
2073         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2074                               aa->oa_flags, aa->oa_agl, rc);
2075
2076         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2077
2078         ldlm_lock_decref(lockh, mode);
2079         LDLM_LOCK_PUT(lock);
2080         RETURN(rc);
2081 }
2082
2083 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2084
2085 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2086  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2087  * other synchronous requests, however keeping some locks and trying to obtain
2088  * others may take a considerable amount of time in a case of ost failure; and
2089  * when other sync requests do not get released lock from a client, the client
2090  * is evicted from the cluster -- such scenarious make the life difficult, so
2091  * release locks just after they are obtained. */
2092 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2093                      __u64 *flags, ldlm_policy_data_t *policy,
2094                      struct ost_lvb *lvb, int kms_valid,
2095                      osc_enqueue_upcall_f upcall, void *cookie,
2096                      struct ldlm_enqueue_info *einfo,
2097                      struct ptlrpc_request_set *rqset, int async, int agl)
2098 {
2099         struct obd_device *obd = exp->exp_obd;
2100         struct lustre_handle lockh = { 0 };
2101         struct ptlrpc_request *req = NULL;
2102         int intent = *flags & LDLM_FL_HAS_INTENT;
2103         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2104         ldlm_mode_t mode;
2105         int rc;
2106         ENTRY;
2107
2108         /* Filesystem lock extents are extended to page boundaries so that
2109          * dealing with the page cache is a little smoother.  */
2110         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2111         policy->l_extent.end |= ~CFS_PAGE_MASK;
2112
2113         /*
2114          * kms is not valid when either object is completely fresh (so that no
2115          * locks are cached), or object was evicted. In the latter case cached
2116          * lock cannot be used, because it would prime inode state with
2117          * potentially stale LVB.
2118          */
2119         if (!kms_valid)
2120                 goto no_match;
2121
2122         /* Next, search for already existing extent locks that will cover us */
2123         /* If we're trying to read, we also search for an existing PW lock.  The
2124          * VFS and page cache already protect us locally, so lots of readers/
2125          * writers can share a single PW lock.
2126          *
2127          * There are problems with conversion deadlocks, so instead of
2128          * converting a read lock to a write lock, we'll just enqueue a new
2129          * one.
2130          *
2131          * At some point we should cancel the read lock instead of making them
2132          * send us a blocking callback, but there are problems with canceling
2133          * locks out from other users right now, too. */
2134         mode = einfo->ei_mode;
2135         if (einfo->ei_mode == LCK_PR)
2136                 mode |= LCK_PW;
2137         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2138                                einfo->ei_type, policy, mode, &lockh, 0);
2139         if (mode) {
2140                 struct ldlm_lock *matched;
2141
2142                 if (*flags & LDLM_FL_TEST_LOCK)
2143                         RETURN(ELDLM_OK);
2144
2145                 matched = ldlm_handle2lock(&lockh);
2146                 if (agl) {
2147                         /* AGL enqueues DLM locks speculatively. Therefore if
2148                          * it already exists a DLM lock, it wll just inform the
2149                          * caller to cancel the AGL process for this stripe. */
2150                         ldlm_lock_decref(&lockh, mode);
2151                         LDLM_LOCK_PUT(matched);
2152                         RETURN(-ECANCELED);
2153                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2154                         *flags |= LDLM_FL_LVB_READY;
2155
2156                         /* We already have a lock, and it's referenced. */
2157                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2158
2159                         ldlm_lock_decref(&lockh, mode);
2160                         LDLM_LOCK_PUT(matched);
2161                         RETURN(ELDLM_OK);
2162                 } else {
2163                         ldlm_lock_decref(&lockh, mode);
2164                         LDLM_LOCK_PUT(matched);
2165                 }
2166         }
2167
2168 no_match:
2169         if (*flags & LDLM_FL_TEST_LOCK)
2170                 RETURN(-ENOLCK);
2171
2172         if (intent) {
2173                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2174                                            &RQF_LDLM_ENQUEUE_LVB);
2175                 if (req == NULL)
2176                         RETURN(-ENOMEM);
2177
2178                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2179                 if (rc < 0) {
2180                         ptlrpc_request_free(req);
2181                         RETURN(rc);
2182                 }
2183
2184                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2185                                      sizeof *lvb);
2186                 ptlrpc_request_set_replen(req);
2187         }
2188
2189         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2190         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2191
2192         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2193                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2194         if (async) {
2195                 if (!rc) {
2196                         struct osc_enqueue_args *aa;
2197                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2198                         aa = ptlrpc_req_async_args(req);
2199                         aa->oa_exp    = exp;
2200                         aa->oa_mode   = einfo->ei_mode;
2201                         aa->oa_type   = einfo->ei_type;
2202                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2203                         aa->oa_upcall = upcall;
2204                         aa->oa_cookie = cookie;
2205                         aa->oa_agl    = !!agl;
2206                         if (!agl) {
2207                                 aa->oa_flags  = flags;
2208                                 aa->oa_lvb    = lvb;
2209                         } else {
2210                                 /* AGL is essentially to enqueue an DLM lock
2211                                  * in advance, so we don't care about the
2212                                  * result of AGL enqueue. */
2213                                 aa->oa_lvb    = NULL;
2214                                 aa->oa_flags  = NULL;
2215                         }
2216
2217                         req->rq_interpret_reply =
2218                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2219                         if (rqset == PTLRPCD_SET)
2220                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2221                         else
2222                                 ptlrpc_set_add_req(rqset, req);
2223                 } else if (intent) {
2224                         ptlrpc_req_finished(req);
2225                 }
2226                 RETURN(rc);
2227         }
2228
2229         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2230                               flags, agl, rc);
2231         if (intent)
2232                 ptlrpc_req_finished(req);
2233
2234         RETURN(rc);
2235 }
2236
2237 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2238                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2239                    __u64 *flags, void *data, struct lustre_handle *lockh,
2240                    int unref)
2241 {
2242         struct obd_device *obd = exp->exp_obd;
2243         __u64 lflags = *flags;
2244         ldlm_mode_t rc;
2245         ENTRY;
2246
2247         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2248                 RETURN(-EIO);
2249
2250         /* Filesystem lock extents are extended to page boundaries so that
2251          * dealing with the page cache is a little smoother */
2252         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2253         policy->l_extent.end |= ~CFS_PAGE_MASK;
2254
2255         /* Next, search for already existing extent locks that will cover us */
2256         /* If we're trying to read, we also search for an existing PW lock.  The
2257          * VFS and page cache already protect us locally, so lots of readers/
2258          * writers can share a single PW lock. */
2259         rc = mode;
2260         if (mode == LCK_PR)
2261                 rc |= LCK_PW;
2262         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2263                              res_id, type, policy, rc, lockh, unref);
2264         if (rc) {
2265                 if (data != NULL) {
2266                         if (!osc_set_data_with_check(lockh, data)) {
2267                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2268                                         ldlm_lock_decref(lockh, rc);
2269                                 RETURN(0);
2270                         }
2271                 }
2272                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2273                         ldlm_lock_addref(lockh, LCK_PR);
2274                         ldlm_lock_decref(lockh, LCK_PW);
2275                 }
2276                 RETURN(rc);
2277         }
2278         RETURN(rc);
2279 }
2280
2281 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2282 {
2283         ENTRY;
2284
2285         if (unlikely(mode == LCK_GROUP))
2286                 ldlm_lock_decref_and_cancel(lockh, mode);
2287         else
2288                 ldlm_lock_decref(lockh, mode);
2289
2290         RETURN(0);
2291 }
2292
2293 static int osc_statfs_interpret(const struct lu_env *env,
2294                                 struct ptlrpc_request *req,
2295                                 struct osc_async_args *aa, int rc)
2296 {
2297         struct obd_statfs *msfs;
2298         ENTRY;
2299
2300         if (rc == -EBADR)
2301                 /* The request has in fact never been sent
2302                  * due to issues at a higher level (LOV).
2303                  * Exit immediately since the caller is
2304                  * aware of the problem and takes care
2305                  * of the clean up */
2306                  RETURN(rc);
2307
2308         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2309             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2310                 GOTO(out, rc = 0);
2311
2312         if (rc != 0)
2313                 GOTO(out, rc);
2314
2315         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2316         if (msfs == NULL) {
2317                 GOTO(out, rc = -EPROTO);
2318         }
2319
2320         *aa->aa_oi->oi_osfs = *msfs;
2321 out:
2322         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2323         RETURN(rc);
2324 }
2325
2326 static int osc_statfs_async(struct obd_export *exp,
2327                             struct obd_info *oinfo, __u64 max_age,
2328                             struct ptlrpc_request_set *rqset)
2329 {
2330         struct obd_device     *obd = class_exp2obd(exp);
2331         struct ptlrpc_request *req;
2332         struct osc_async_args *aa;
2333         int                    rc;
2334         ENTRY;
2335
2336         /* We could possibly pass max_age in the request (as an absolute
2337          * timestamp or a "seconds.usec ago") so the target can avoid doing
2338          * extra calls into the filesystem if that isn't necessary (e.g.
2339          * during mount that would help a bit).  Having relative timestamps
2340          * is not so great if request processing is slow, while absolute
2341          * timestamps are not ideal because they need time synchronization. */
2342         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2343         if (req == NULL)
2344                 RETURN(-ENOMEM);
2345
2346         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2347         if (rc) {
2348                 ptlrpc_request_free(req);
2349                 RETURN(rc);
2350         }
2351         ptlrpc_request_set_replen(req);
2352         req->rq_request_portal = OST_CREATE_PORTAL;
2353         ptlrpc_at_set_req_timeout(req);
2354
2355         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2356                 /* procfs requests not want stat in wait for avoid deadlock */
2357                 req->rq_no_resend = 1;
2358                 req->rq_no_delay = 1;
2359         }
2360
2361         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2362         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2363         aa = ptlrpc_req_async_args(req);
2364         aa->aa_oi = oinfo;
2365
2366         ptlrpc_set_add_req(rqset, req);
2367         RETURN(0);
2368 }
2369
2370 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2371                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2372 {
2373         struct obd_device     *obd = class_exp2obd(exp);
2374         struct obd_statfs     *msfs;
2375         struct ptlrpc_request *req;
2376         struct obd_import     *imp = NULL;
2377         int rc;
2378         ENTRY;
2379
2380         /*Since the request might also come from lprocfs, so we need
2381          *sync this with client_disconnect_export Bug15684*/
2382         down_read(&obd->u.cli.cl_sem);
2383         if (obd->u.cli.cl_import)
2384                 imp = class_import_get(obd->u.cli.cl_import);
2385         up_read(&obd->u.cli.cl_sem);
2386         if (!imp)
2387                 RETURN(-ENODEV);
2388
2389         /* We could possibly pass max_age in the request (as an absolute
2390          * timestamp or a "seconds.usec ago") so the target can avoid doing
2391          * extra calls into the filesystem if that isn't necessary (e.g.
2392          * during mount that would help a bit).  Having relative timestamps
2393          * is not so great if request processing is slow, while absolute
2394          * timestamps are not ideal because they need time synchronization. */
2395         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2396
2397         class_import_put(imp);
2398
2399         if (req == NULL)
2400                 RETURN(-ENOMEM);
2401
2402         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2403         if (rc) {
2404                 ptlrpc_request_free(req);
2405                 RETURN(rc);
2406         }
2407         ptlrpc_request_set_replen(req);
2408         req->rq_request_portal = OST_CREATE_PORTAL;
2409         ptlrpc_at_set_req_timeout(req);
2410
2411         if (flags & OBD_STATFS_NODELAY) {
2412                 /* procfs requests not want stat in wait for avoid deadlock */
2413                 req->rq_no_resend = 1;
2414                 req->rq_no_delay = 1;
2415         }
2416
2417         rc = ptlrpc_queue_wait(req);
2418         if (rc)
2419                 GOTO(out, rc);
2420
2421         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2422         if (msfs == NULL) {
2423                 GOTO(out, rc = -EPROTO);
2424         }
2425
2426         *osfs = *msfs;
2427
2428         EXIT;
2429  out:
2430         ptlrpc_req_finished(req);
2431         return rc;
2432 }
2433
2434 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2435                          void *karg, void *uarg)
2436 {
2437         struct obd_device *obd = exp->exp_obd;
2438         struct obd_ioctl_data *data = karg;
2439         int err = 0;
2440         ENTRY;
2441
2442         if (!try_module_get(THIS_MODULE)) {
2443                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2444                        module_name(THIS_MODULE));
2445                 return -EINVAL;
2446         }
2447         switch (cmd) {
2448         case OBD_IOC_CLIENT_RECOVER:
2449                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2450                                             data->ioc_inlbuf1, 0);
2451                 if (err > 0)
2452                         err = 0;
2453                 GOTO(out, err);
2454         case IOC_OSC_SET_ACTIVE:
2455                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2456                                                data->ioc_offset);
2457                 GOTO(out, err);
2458         case OBD_IOC_POLL_QUOTACHECK:
2459                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2460                 GOTO(out, err);
2461         case OBD_IOC_PING_TARGET:
2462                 err = ptlrpc_obd_ping(obd);
2463                 GOTO(out, err);
2464         default:
2465                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2466                        cmd, current_comm());
2467                 GOTO(out, err = -ENOTTY);
2468         }
2469 out:
2470         module_put(THIS_MODULE);
2471         return err;
2472 }
2473
2474 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2475                               u32 keylen, void *key,
2476                               u32 vallen, void *val,
2477                               struct ptlrpc_request_set *set)
2478 {
2479         struct ptlrpc_request *req;
2480         struct obd_device     *obd = exp->exp_obd;
2481         struct obd_import     *imp = class_exp2cliimp(exp);
2482         char                  *tmp;
2483         int                    rc;
2484         ENTRY;
2485
2486         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2487
2488         if (KEY_IS(KEY_CHECKSUM)) {
2489                 if (vallen != sizeof(int))
2490                         RETURN(-EINVAL);
2491                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2492                 RETURN(0);
2493         }
2494
2495         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2496                 sptlrpc_conf_client_adapt(obd);
2497                 RETURN(0);
2498         }
2499
2500         if (KEY_IS(KEY_FLUSH_CTX)) {
2501                 sptlrpc_import_flush_my_ctx(imp);
2502                 RETURN(0);
2503         }
2504
2505         if (KEY_IS(KEY_CACHE_SET)) {
2506                 struct client_obd *cli = &obd->u.cli;
2507
2508                 LASSERT(cli->cl_cache == NULL); /* only once */
2509                 cli->cl_cache = (struct cl_client_cache *)val;
2510                 cl_cache_incref(cli->cl_cache);
2511                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2512
2513                 /* add this osc into entity list */
2514                 LASSERT(list_empty(&cli->cl_lru_osc));
2515                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2516                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2517                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2518
2519                 RETURN(0);
2520         }
2521
2522         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2523                 struct client_obd *cli = &obd->u.cli;
2524                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2525                 long target = *(long *)val;
2526
2527                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2528                 *(long *)val -= nr;
2529                 RETURN(0);
2530         }
2531
2532         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2533                 RETURN(-EINVAL);
2534
2535         /* We pass all other commands directly to OST. Since nobody calls osc
2536            methods directly and everybody is supposed to go through LOV, we
2537            assume lov checked invalid values for us.
2538            The only recognised values so far are evict_by_nid and mds_conn.
2539            Even if something bad goes through, we'd get a -EINVAL from OST
2540            anyway. */
2541
2542         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2543                                                 &RQF_OST_SET_GRANT_INFO :
2544                                                 &RQF_OBD_SET_INFO);
2545         if (req == NULL)
2546                 RETURN(-ENOMEM);
2547
2548         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2549                              RCL_CLIENT, keylen);
2550         if (!KEY_IS(KEY_GRANT_SHRINK))
2551                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2552                                      RCL_CLIENT, vallen);
2553         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2554         if (rc) {
2555                 ptlrpc_request_free(req);
2556                 RETURN(rc);
2557         }
2558
2559         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2560         memcpy(tmp, key, keylen);
2561         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2562                                                         &RMF_OST_BODY :
2563                                                         &RMF_SETINFO_VAL);
2564         memcpy(tmp, val, vallen);
2565
2566         if (KEY_IS(KEY_GRANT_SHRINK)) {
2567                 struct osc_grant_args *aa;
2568                 struct obdo *oa;
2569
2570                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2571                 aa = ptlrpc_req_async_args(req);
2572                 OBDO_ALLOC(oa);
2573                 if (!oa) {
2574                         ptlrpc_req_finished(req);
2575                         RETURN(-ENOMEM);
2576                 }
2577                 *oa = ((struct ost_body *)val)->oa;
2578                 aa->aa_oa = oa;
2579                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2580         }
2581
2582         ptlrpc_request_set_replen(req);
2583         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2584                 LASSERT(set != NULL);
2585                 ptlrpc_set_add_req(set, req);
2586                 ptlrpc_check_set(NULL, set);
2587         } else
2588                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2589
2590         RETURN(0);
2591 }
2592
2593 static int osc_reconnect(const struct lu_env *env,
2594                          struct obd_export *exp, struct obd_device *obd,
2595                          struct obd_uuid *cluuid,
2596                          struct obd_connect_data *data,
2597                          void *localdata)
2598 {
2599         struct client_obd *cli = &obd->u.cli;
2600
2601         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2602                 long lost_grant;
2603
2604                 spin_lock(&cli->cl_loi_list_lock);
2605                 data->ocd_grant = (cli->cl_avail_grant +
2606                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2607                                   2 * cli_brw_size(obd);
2608                 lost_grant = cli->cl_lost_grant;
2609                 cli->cl_lost_grant = 0;
2610                 spin_unlock(&cli->cl_loi_list_lock);
2611
2612                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2613                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2614                        data->ocd_version, data->ocd_grant, lost_grant);
2615         }
2616
2617         RETURN(0);
2618 }
2619
2620 static int osc_disconnect(struct obd_export *exp)
2621 {
2622         struct obd_device *obd = class_exp2obd(exp);
2623         int rc;
2624
2625         rc = client_disconnect_export(exp);
2626         /**
2627          * Initially we put del_shrink_grant before disconnect_export, but it
2628          * causes the following problem if setup (connect) and cleanup
2629          * (disconnect) are tangled together.
2630          *      connect p1                     disconnect p2
2631          *   ptlrpc_connect_import
2632          *     ...............               class_manual_cleanup
2633          *                                     osc_disconnect
2634          *                                     del_shrink_grant
2635          *   ptlrpc_connect_interrupt
2636          *     init_grant_shrink
2637          *   add this client to shrink list
2638          *                                      cleanup_osc
2639          * Bang! pinger trigger the shrink.
2640          * So the osc should be disconnected from the shrink list, after we
2641          * are sure the import has been destroyed. BUG18662
2642          */
2643         if (obd->u.cli.cl_import == NULL)
2644                 osc_del_shrink_grant(&obd->u.cli);
2645         return rc;
2646 }
2647
2648 static int osc_import_event(struct obd_device *obd,
2649                             struct obd_import *imp,
2650                             enum obd_import_event event)
2651 {
2652         struct client_obd *cli;
2653         int rc = 0;
2654
2655         ENTRY;
2656         LASSERT(imp->imp_obd == obd);
2657
2658         switch (event) {
2659         case IMP_EVENT_DISCON: {
2660                 cli = &obd->u.cli;
2661                 spin_lock(&cli->cl_loi_list_lock);
2662                 cli->cl_avail_grant = 0;
2663                 cli->cl_lost_grant = 0;
2664                 spin_unlock(&cli->cl_loi_list_lock);
2665                 break;
2666         }
2667         case IMP_EVENT_INACTIVE: {
2668                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2669                 break;
2670         }
2671         case IMP_EVENT_INVALIDATE: {
2672                 struct ldlm_namespace *ns = obd->obd_namespace;
2673                 struct lu_env         *env;
2674                 int                    refcheck;
2675
2676                 env = cl_env_get(&refcheck);
2677                 if (!IS_ERR(env)) {
2678                         /* Reset grants */
2679                         cli = &obd->u.cli;
2680                         /* all pages go to failing rpcs due to the invalid
2681                          * import */
2682                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2683
2684                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2685                         cl_env_put(env, &refcheck);
2686                 } else
2687                         rc = PTR_ERR(env);
2688                 break;
2689         }
2690         case IMP_EVENT_ACTIVE: {
2691                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2692                 break;
2693         }
2694         case IMP_EVENT_OCD: {
2695                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2696
2697                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2698                         osc_init_grant(&obd->u.cli, ocd);
2699
2700                 /* See bug 7198 */
2701                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2702                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2703
2704                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2705                 break;
2706         }
2707         case IMP_EVENT_DEACTIVATE: {
2708                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2709                 break;
2710         }
2711         case IMP_EVENT_ACTIVATE: {
2712                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2713                 break;
2714         }
2715         default:
2716                 CERROR("Unknown import event %d\n", event);
2717                 LBUG();
2718         }
2719         RETURN(rc);
2720 }
2721
2722 /**
2723  * Determine whether the lock can be canceled before replaying the lock
2724  * during recovery, see bug16774 for detailed information.
2725  *
2726  * \retval zero the lock can't be canceled
2727  * \retval other ok to cancel
2728  */
2729 static int osc_cancel_weight(struct ldlm_lock *lock)
2730 {
2731         /*
2732          * Cancel all unused and granted extent lock.
2733          */
2734         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2735             lock->l_granted_mode == lock->l_req_mode &&
2736             osc_ldlm_weigh_ast(lock) == 0)
2737                 RETURN(1);
2738
2739         RETURN(0);
2740 }
2741
2742 static int brw_queue_work(const struct lu_env *env, void *data)
2743 {
2744         struct client_obd *cli = data;
2745
2746         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2747
2748         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2749         RETURN(0);
2750 }
2751
2752 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2753 {
2754         struct client_obd *cli = &obd->u.cli;
2755         struct obd_type   *type;
2756         void              *handler;
2757         int                rc;
2758         ENTRY;
2759
2760         rc = ptlrpcd_addref();
2761         if (rc)
2762                 RETURN(rc);
2763
2764         rc = client_obd_setup(obd, lcfg);
2765         if (rc)
2766                 GOTO(out_ptlrpcd, rc);
2767
2768         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2769         if (IS_ERR(handler))
2770                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2771         cli->cl_writeback_work = handler;
2772
2773         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2774         if (IS_ERR(handler))
2775                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2776         cli->cl_lru_work = handler;
2777
2778         rc = osc_quota_setup(obd);
2779         if (rc)
2780                 GOTO(out_ptlrpcd_work, rc);
2781
2782         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2783
2784 #ifdef CONFIG_PROC_FS
2785         obd->obd_vars = lprocfs_osc_obd_vars;
2786 #endif
2787         /* If this is true then both client (osc) and server (osp) are on the
2788          * same node. The osp layer if loaded first will register the osc proc
2789          * directory. In that case this obd_device will be attached its proc
2790          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2791         type = class_search_type(LUSTRE_OSP_NAME);
2792         if (type && type->typ_procsym) {
2793                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2794                                                        type->typ_procsym,
2795                                                        obd->obd_vars, obd);
2796                 if (IS_ERR(obd->obd_proc_entry)) {
2797                         rc = PTR_ERR(obd->obd_proc_entry);
2798                         CERROR("error %d setting up lprocfs for %s\n", rc,
2799                                obd->obd_name);
2800                         obd->obd_proc_entry = NULL;
2801                 }
2802         } else {
2803                 rc = lprocfs_obd_setup(obd);
2804         }
2805
2806         /* If the basic OSC proc tree construction succeeded then
2807          * lets do the rest. */
2808         if (rc == 0) {
2809                 lproc_osc_attach_seqstat(obd);
2810                 sptlrpc_lprocfs_cliobd_attach(obd);
2811                 ptlrpc_lprocfs_register_obd(obd);
2812         }
2813
2814         /* We need to allocate a few requests more, because
2815          * brw_interpret tries to create new requests before freeing
2816          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2817          * reserved, but I'm afraid that might be too much wasted RAM
2818          * in fact, so 2 is just my guess and still should work. */
2819         cli->cl_import->imp_rq_pool =
2820                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2821                                     OST_MAXREQSIZE,
2822                                     ptlrpc_add_rqs_to_pool);
2823
2824         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2825         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2826         RETURN(0);
2827
2828 out_ptlrpcd_work:
2829         if (cli->cl_writeback_work != NULL) {
2830                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2831                 cli->cl_writeback_work = NULL;
2832         }
2833         if (cli->cl_lru_work != NULL) {
2834                 ptlrpcd_destroy_work(cli->cl_lru_work);
2835                 cli->cl_lru_work = NULL;
2836         }
2837 out_client_setup:
2838         client_obd_cleanup(obd);
2839 out_ptlrpcd:
2840         ptlrpcd_decref();
2841         RETURN(rc);
2842 }
2843
2844 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2845 {
2846         int rc = 0;
2847         ENTRY;
2848
2849         switch (stage) {
2850         case OBD_CLEANUP_EARLY: {
2851                 struct obd_import *imp;
2852                 imp = obd->u.cli.cl_import;
2853                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2854                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2855                 ptlrpc_deactivate_import(imp);
2856                 spin_lock(&imp->imp_lock);
2857                 imp->imp_pingable = 0;
2858                 spin_unlock(&imp->imp_lock);
2859                 break;
2860         }
2861         case OBD_CLEANUP_EXPORTS: {
2862                 struct client_obd *cli = &obd->u.cli;
2863                 /* LU-464
2864                  * for echo client, export may be on zombie list, wait for
2865                  * zombie thread to cull it, because cli.cl_import will be
2866                  * cleared in client_disconnect_export():
2867                  *   class_export_destroy() -> obd_cleanup() ->
2868                  *   echo_device_free() -> echo_client_cleanup() ->
2869                  *   obd_disconnect() -> osc_disconnect() ->
2870                  *   client_disconnect_export()
2871                  */
2872                 obd_zombie_barrier();
2873                 if (cli->cl_writeback_work) {
2874                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2875                         cli->cl_writeback_work = NULL;
2876                 }
2877                 if (cli->cl_lru_work) {
2878                         ptlrpcd_destroy_work(cli->cl_lru_work);
2879                         cli->cl_lru_work = NULL;
2880                 }
2881                 obd_cleanup_client_import(obd);
2882                 ptlrpc_lprocfs_unregister_obd(obd);
2883                 lprocfs_obd_cleanup(obd);
2884                 break;
2885                 }
2886         }
2887         RETURN(rc);
2888 }
2889
2890 int osc_cleanup(struct obd_device *obd)
2891 {
2892         struct client_obd *cli = &obd->u.cli;
2893         int rc;
2894
2895         ENTRY;
2896
2897         /* lru cleanup */
2898         if (cli->cl_cache != NULL) {
2899                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2900                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2901                 list_del_init(&cli->cl_lru_osc);
2902                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2903                 cli->cl_lru_left = NULL;
2904                 cl_cache_decref(cli->cl_cache);
2905                 cli->cl_cache = NULL;
2906         }
2907
2908         /* free memory of osc quota cache */
2909         osc_quota_cleanup(obd);
2910
2911         rc = client_obd_cleanup(obd);
2912
2913         ptlrpcd_decref();
2914         RETURN(rc);
2915 }
2916
2917 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2918 {
2919         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2920         return rc > 0 ? 0: rc;
2921 }
2922
2923 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2924 {
2925         return osc_process_config_base(obd, buf);
2926 }
2927
2928 static struct obd_ops osc_obd_ops = {
2929         .o_owner                = THIS_MODULE,
2930         .o_setup                = osc_setup,
2931         .o_precleanup           = osc_precleanup,
2932         .o_cleanup              = osc_cleanup,
2933         .o_add_conn             = client_import_add_conn,
2934         .o_del_conn             = client_import_del_conn,
2935         .o_connect              = client_connect_import,
2936         .o_reconnect            = osc_reconnect,
2937         .o_disconnect           = osc_disconnect,
2938         .o_statfs               = osc_statfs,
2939         .o_statfs_async         = osc_statfs_async,
2940         .o_create               = osc_create,
2941         .o_destroy              = osc_destroy,
2942         .o_getattr              = osc_getattr,
2943         .o_setattr              = osc_setattr,
2944         .o_setattr_async        = osc_setattr_async,
2945         .o_iocontrol            = osc_iocontrol,
2946         .o_set_info_async       = osc_set_info_async,
2947         .o_import_event         = osc_import_event,
2948         .o_process_config       = osc_process_config,
2949         .o_quotactl             = osc_quotactl,
2950         .o_quotacheck           = osc_quotacheck,
2951 };
2952
2953 static int __init osc_init(void)
2954 {
2955         bool enable_proc = true;
2956         struct obd_type *type;
2957         int rc;
2958         ENTRY;
2959
2960         /* print an address of _any_ initialized kernel symbol from this
2961          * module, to allow debugging with gdb that doesn't support data
2962          * symbols from modules.*/
2963         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2964
2965         rc = lu_kmem_init(osc_caches);
2966         if (rc)
2967                 RETURN(rc);
2968
2969         type = class_search_type(LUSTRE_OSP_NAME);
2970         if (type != NULL && type->typ_procsym != NULL)
2971                 enable_proc = false;
2972
2973         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2974                                  LUSTRE_OSC_NAME, &osc_device_type);
2975         if (rc) {
2976                 lu_kmem_fini(osc_caches);
2977                 RETURN(rc);
2978         }
2979
2980         RETURN(rc);
2981 }
2982
2983 static void /*__exit*/ osc_exit(void)
2984 {
2985         class_unregister_type(LUSTRE_OSC_NAME);
2986         lu_kmem_fini(osc_caches);
2987 }
2988
2989 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2990 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2991 MODULE_LICENSE("GPL");
2992
2993 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);