Whamcloud - gitweb
4e3a10ce0876d7ba65444e8225bc0eaef51cd647
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_setattr_args {
72         struct obdo             *sa_oa;
73         obd_enqueue_update_f     sa_upcall;
74         void                    *sa_cookie;
75 };
76
77 struct osc_fsync_args {
78         struct obd_info *fa_oi;
79         obd_enqueue_update_f     fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100                                  struct ost_body *body, void *capa)
101 {
102         struct obd_capa *oc = (struct obd_capa *)capa;
103         struct lustre_capa *c;
104
105         if (!capa)
106                 return;
107
108         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
109         LASSERT(c);
110         capa_cpy(c, oc);
111         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112         DEBUG_CAPA(D_SEC, c, "pack");
113 }
114
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
116 {
117         struct ost_body *body;
118
119         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
120         LASSERT(body);
121
122         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
123                              oinfo->oi_oa);
124         osc_pack_capa(req, body, oinfo->oi_capa);
125 }
126
127 void osc_set_capa_size(struct ptlrpc_request *req,
128                        const struct req_msg_field *field,
129                        struct obd_capa *oc)
130 {
131         if (oc == NULL)
132                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
133         else
134                 /* it is already calculated as sizeof struct obd_capa */
135                 ;
136 }
137
138 int osc_getattr_interpret(const struct lu_env *env,
139                           struct ptlrpc_request *req,
140                           struct osc_async_args *aa, int rc)
141 {
142         struct ost_body *body;
143         ENTRY;
144
145         if (rc != 0)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body) {
150                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152                                      aa->aa_oi->oi_oa, &body->oa);
153
154                 /* This should really be sent by the OST */
155                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
157         } else {
158                 CDEBUG(D_INFO, "can't unpack ost_body\n");
159                 rc = -EPROTO;
160                 aa->aa_oi->oi_oa->o_valid = 0;
161         }
162 out:
163         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
164         RETURN(rc);
165 }
166
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168                        struct obd_info *oinfo)
169 {
170         struct ptlrpc_request *req;
171         struct ost_body       *body;
172         int                    rc;
173         ENTRY;
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
181         if (rc) {
182                 ptlrpc_request_free(req);
183                 RETURN(rc);
184         }
185
186         osc_pack_req_body(req, oinfo);
187
188         ptlrpc_request_set_replen(req);
189
190         rc = ptlrpc_queue_wait(req);
191         if (rc)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
200                              &body->oa);
201
202         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204
205         EXIT;
206  out:
207         ptlrpc_req_finished(req);
208         return rc;
209 }
210
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212                        struct obd_info *oinfo, struct obd_trans_info *oti)
213 {
214         struct ptlrpc_request *req;
215         struct ost_body       *body;
216         int                    rc;
217         ENTRY;
218
219         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
220
221         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
222         if (req == NULL)
223                 RETURN(-ENOMEM);
224
225         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
227         if (rc) {
228                 ptlrpc_request_free(req);
229                 RETURN(rc);
230         }
231
232         osc_pack_req_body(req, oinfo);
233
234         ptlrpc_request_set_replen(req);
235
236         rc = ptlrpc_queue_wait(req);
237         if (rc)
238                 GOTO(out, rc);
239
240         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
241         if (body == NULL)
242                 GOTO(out, rc = -EPROTO);
243
244         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
245                              &body->oa);
246
247         EXIT;
248 out:
249         ptlrpc_req_finished(req);
250         RETURN(rc);
251 }
252
253 static int osc_setattr_interpret(const struct lu_env *env,
254                                  struct ptlrpc_request *req,
255                                  struct osc_setattr_args *sa, int rc)
256 {
257         struct ost_body *body;
258         ENTRY;
259
260         if (rc != 0)
261                 GOTO(out, rc);
262
263         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
264         if (body == NULL)
265                 GOTO(out, rc = -EPROTO);
266
267         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
268                              &body->oa);
269 out:
270         rc = sa->sa_upcall(sa->sa_cookie, rc);
271         RETURN(rc);
272 }
273
274 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
275                            struct obd_trans_info *oti,
276                            obd_enqueue_update_f upcall, void *cookie,
277                            struct ptlrpc_request_set *rqset)
278 {
279         struct ptlrpc_request   *req;
280         struct osc_setattr_args *sa;
281         int                      rc;
282         ENTRY;
283
284         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
285         if (req == NULL)
286                 RETURN(-ENOMEM);
287
288         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
289         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
290         if (rc) {
291                 ptlrpc_request_free(req);
292                 RETURN(rc);
293         }
294
295         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
296                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
297
298         osc_pack_req_body(req, oinfo);
299
300         ptlrpc_request_set_replen(req);
301
302         /* do mds to ost setattr asynchronously */
303         if (!rqset) {
304                 /* Do not wait for response. */
305                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
306         } else {
307                 req->rq_interpret_reply =
308                         (ptlrpc_interpterer_t)osc_setattr_interpret;
309
310                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
311                 sa = ptlrpc_req_async_args(req);
312                 sa->sa_oa = oinfo->oi_oa;
313                 sa->sa_upcall = upcall;
314                 sa->sa_cookie = cookie;
315
316                 if (rqset == PTLRPCD_SET)
317                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
318                 else
319                         ptlrpc_set_add_req(rqset, req);
320         }
321
322         RETURN(0);
323 }
324
325 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
326                              struct obd_trans_info *oti,
327                              struct ptlrpc_request_set *rqset)
328 {
329         return osc_setattr_async_base(exp, oinfo, oti,
330                                       oinfo->oi_cb_up, oinfo, rqset);
331 }
332
333 static int osc_create(const struct lu_env *env, struct obd_export *exp,
334                       struct obdo *oa, struct obd_trans_info *oti)
335 {
336         struct ptlrpc_request *req;
337         struct ost_body       *body;
338         int                    rc;
339         ENTRY;
340
341         LASSERT(oa != NULL);
342         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
343         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
344
345         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
346         if (req == NULL)
347                 GOTO(out, rc = -ENOMEM);
348
349         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
350         if (rc) {
351                 ptlrpc_request_free(req);
352                 GOTO(out, rc);
353         }
354
355         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
356         LASSERT(body);
357
358         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
359
360         ptlrpc_request_set_replen(req);
361
362         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
363             oa->o_flags == OBD_FL_DELORPHAN) {
364                 DEBUG_REQ(D_HA, req,
365                           "delorphan from OST integration");
366                 /* Don't resend the delorphan req */
367                 req->rq_no_resend = req->rq_no_delay = 1;
368         }
369
370         rc = ptlrpc_queue_wait(req);
371         if (rc)
372                 GOTO(out_req, rc);
373
374         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
375         if (body == NULL)
376                 GOTO(out_req, rc = -EPROTO);
377
378         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
379         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
380
381         oa->o_blksize = cli_brw_size(exp->exp_obd);
382         oa->o_valid |= OBD_MD_FLBLKSZ;
383
384         if (oti != NULL) {
385                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
386                         if (oti->oti_logcookies == NULL)
387                                 oti->oti_logcookies = &oti->oti_onecookie;
388
389                         *oti->oti_logcookies = oa->o_lcookie;
390                 }
391         }
392
393         CDEBUG(D_HA, "transno: "LPD64"\n",
394                lustre_msg_get_transno(req->rq_repmsg));
395 out_req:
396         ptlrpc_req_finished(req);
397 out:
398         RETURN(rc);
399 }
400
401 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
402                    obd_enqueue_update_f upcall, void *cookie,
403                    struct ptlrpc_request_set *rqset)
404 {
405         struct ptlrpc_request   *req;
406         struct osc_setattr_args *sa;
407         struct ost_body         *body;
408         int                      rc;
409         ENTRY;
410
411         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
412         if (req == NULL)
413                 RETURN(-ENOMEM);
414
415         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
416         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
417         if (rc) {
418                 ptlrpc_request_free(req);
419                 RETURN(rc);
420         }
421         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
422         ptlrpc_at_set_req_timeout(req);
423
424         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
425         LASSERT(body);
426         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
427                              oinfo->oi_oa);
428         osc_pack_capa(req, body, oinfo->oi_capa);
429
430         ptlrpc_request_set_replen(req);
431
432         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
433         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
434         sa = ptlrpc_req_async_args(req);
435         sa->sa_oa     = oinfo->oi_oa;
436         sa->sa_upcall = upcall;
437         sa->sa_cookie = cookie;
438         if (rqset == PTLRPCD_SET)
439                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
440         else
441                 ptlrpc_set_add_req(rqset, req);
442
443         RETURN(0);
444 }
445
446 static int osc_sync_interpret(const struct lu_env *env,
447                               struct ptlrpc_request *req,
448                               void *arg, int rc)
449 {
450         struct osc_fsync_args *fa = arg;
451         struct ost_body *body;
452         ENTRY;
453
454         if (rc)
455                 GOTO(out, rc);
456
457         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
458         if (body == NULL) {
459                 CERROR ("can't unpack ost_body\n");
460                 GOTO(out, rc = -EPROTO);
461         }
462
463         *fa->fa_oi->oi_oa = body->oa;
464 out:
465         rc = fa->fa_upcall(fa->fa_cookie, rc);
466         RETURN(rc);
467 }
468
469 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
470                   obd_enqueue_update_f upcall, void *cookie,
471                   struct ptlrpc_request_set *rqset)
472 {
473         struct ptlrpc_request *req;
474         struct ost_body       *body;
475         struct osc_fsync_args *fa;
476         int                    rc;
477         ENTRY;
478
479         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
480         if (req == NULL)
481                 RETURN(-ENOMEM);
482
483         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
484         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
485         if (rc) {
486                 ptlrpc_request_free(req);
487                 RETURN(rc);
488         }
489
490         /* overload the size and blocks fields in the oa with start/end */
491         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
492         LASSERT(body);
493         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
494                              oinfo->oi_oa);
495         osc_pack_capa(req, body, oinfo->oi_capa);
496
497         ptlrpc_request_set_replen(req);
498         req->rq_interpret_reply = osc_sync_interpret;
499
500         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
501         fa = ptlrpc_req_async_args(req);
502         fa->fa_oi = oinfo;
503         fa->fa_upcall = upcall;
504         fa->fa_cookie = cookie;
505
506         if (rqset == PTLRPCD_SET)
507                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
508         else
509                 ptlrpc_set_add_req(rqset, req);
510
511         RETURN (0);
512 }
513
514 /* Find and cancel locally locks matched by @mode in the resource found by
515  * @objid. Found locks are added into @cancel list. Returns the amount of
516  * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518                                    struct list_head *cancels,
519                                    ldlm_mode_t mode, __u64 lock_flags)
520 {
521         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522         struct ldlm_res_id res_id;
523         struct ldlm_resource *res;
524         int count;
525         ENTRY;
526
527         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528          * export) but disabled through procfs (flag in NS).
529          *
530          * This distinguishes from a case when ELC is not supported originally,
531          * when we still want to cancel locks in advance and just cancel them
532          * locally, without sending any RPC. */
533         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
534                 RETURN(0);
535
536         ostid_build_res_name(&oa->o_oi, &res_id);
537         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
538         if (IS_ERR(res))
539                 RETURN(0);
540
541         LDLM_RESOURCE_ADDREF(res);
542         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543                                            lock_flags, 0, NULL);
544         LDLM_RESOURCE_DELREF(res);
545         ldlm_resource_putref(res);
546         RETURN(count);
547 }
548
549 static int osc_destroy_interpret(const struct lu_env *env,
550                                  struct ptlrpc_request *req, void *data,
551                                  int rc)
552 {
553         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554
555         atomic_dec(&cli->cl_destroy_in_flight);
556         wake_up(&cli->cl_destroy_waitq);
557         return 0;
558 }
559
560 static int osc_can_send_destroy(struct client_obd *cli)
561 {
562         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563             cli->cl_max_rpcs_in_flight) {
564                 /* The destroy request can be sent */
565                 return 1;
566         }
567         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568             cli->cl_max_rpcs_in_flight) {
569                 /*
570                  * The counter has been modified between the two atomic
571                  * operations.
572                  */
573                 wake_up(&cli->cl_destroy_waitq);
574         }
575         return 0;
576 }
577
578 /* Destroy requests can be async always on the client, and we don't even really
579  * care about the return code since the client cannot do anything at all about
580  * a destroy failure.
581  * When the MDS is unlinking a filename, it saves the file objects into a
582  * recovery llog, and these object records are cancelled when the OST reports
583  * they were destroyed and sync'd to disk (i.e. transaction committed).
584  * If the client dies, or the OST is down when the object should be destroyed,
585  * the records are not cancelled, and when the OST reconnects to the MDS next,
586  * it will retrieve the llog unlink logs and then sends the log cancellation
587  * cookies to the MDS after committing destroy transactions. */
588 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
589                        struct obdo *oa, struct obd_trans_info *oti)
590 {
591         struct client_obd     *cli = &exp->exp_obd->u.cli;
592         struct ptlrpc_request *req;
593         struct ost_body       *body;
594         struct list_head       cancels = LIST_HEAD_INIT(cancels);
595         int rc, count;
596         ENTRY;
597
598         if (!oa) {
599                 CDEBUG(D_INFO, "oa NULL\n");
600                 RETURN(-EINVAL);
601         }
602
603         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
604                                         LDLM_FL_DISCARD_DATA);
605
606         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
607         if (req == NULL) {
608                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
609                 RETURN(-ENOMEM);
610         }
611
612         osc_set_capa_size(req, &RMF_CAPA1, NULL);
613         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
614                                0, &cancels, count);
615         if (rc) {
616                 ptlrpc_request_free(req);
617                 RETURN(rc);
618         }
619
620         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
621         ptlrpc_at_set_req_timeout(req);
622
623         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
624                 oa->o_lcookie = *oti->oti_logcookies;
625         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
626         LASSERT(body);
627         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
628
629         ptlrpc_request_set_replen(req);
630
631         /* If osc_destory is for destroying the unlink orphan,
632          * sent from MDT to OST, which should not be blocked here,
633          * because the process might be triggered by ptlrpcd, and
634          * it is not good to block ptlrpcd thread (b=16006)*/
635         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
636                 req->rq_interpret_reply = osc_destroy_interpret;
637                 if (!osc_can_send_destroy(cli)) {
638                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
639                                                           NULL);
640
641                         /*
642                          * Wait until the number of on-going destroy RPCs drops
643                          * under max_rpc_in_flight
644                          */
645                         l_wait_event_exclusive(cli->cl_destroy_waitq,
646                                                osc_can_send_destroy(cli), &lwi);
647                 }
648         }
649
650         /* Do not wait for response */
651         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
652         RETURN(0);
653 }
654
655 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
656                                 long writing_bytes)
657 {
658         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
659
660         LASSERT(!(oa->o_valid & bits));
661
662         oa->o_valid |= bits;
663         spin_lock(&cli->cl_loi_list_lock);
664         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
665         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
666                      cli->cl_dirty_max_pages)) {
667                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
668                        cli->cl_dirty_pages, cli->cl_dirty_transit,
669                        cli->cl_dirty_max_pages);
670                 oa->o_undirty = 0;
671         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
672                             atomic_long_read(&obd_dirty_transit_pages) >
673                             (obd_max_dirty_pages + 1))) {
674                 /* The atomic_read() allowing the atomic_inc() are
675                  * not covered by a lock thus they may safely race and trip
676                  * this CERROR() unless we add in a small fudge factor (+1). */
677                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
678                        cli->cl_import->imp_obd->obd_name,
679                        atomic_long_read(&obd_dirty_pages),
680                        atomic_long_read(&obd_dirty_transit_pages),
681                        obd_max_dirty_pages);
682                 oa->o_undirty = 0;
683         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
684                             0x7fffffff)) {
685                 CERROR("dirty %lu - dirty_max %lu too big???\n",
686                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
687                 oa->o_undirty = 0;
688         } else {
689                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
690                                       PAGE_CACHE_SHIFT) *
691                                      (cli->cl_max_rpcs_in_flight + 1);
692                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
693                                     max_in_flight);
694         }
695         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
696         oa->o_dropped = cli->cl_lost_grant;
697         cli->cl_lost_grant = 0;
698         spin_unlock(&cli->cl_loi_list_lock);
699         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
700                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
701
702 }
703
704 void osc_update_next_shrink(struct client_obd *cli)
705 {
706         cli->cl_next_shrink_grant =
707                 cfs_time_shift(cli->cl_grant_shrink_interval);
708         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
709                cli->cl_next_shrink_grant);
710 }
711
712 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
713 {
714         spin_lock(&cli->cl_loi_list_lock);
715         cli->cl_avail_grant += grant;
716         spin_unlock(&cli->cl_loi_list_lock);
717 }
718
719 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
720 {
721         if (body->oa.o_valid & OBD_MD_FLGRANT) {
722                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
723                 __osc_update_grant(cli, body->oa.o_grant);
724         }
725 }
726
727 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
728                               obd_count keylen, void *key, obd_count vallen,
729                               void *val, struct ptlrpc_request_set *set);
730
731 static int osc_shrink_grant_interpret(const struct lu_env *env,
732                                       struct ptlrpc_request *req,
733                                       void *aa, int rc)
734 {
735         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
736         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
737         struct ost_body *body;
738
739         if (rc != 0) {
740                 __osc_update_grant(cli, oa->o_grant);
741                 GOTO(out, rc);
742         }
743
744         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
745         LASSERT(body);
746         osc_update_grant(cli, body);
747 out:
748         OBDO_FREE(oa);
749         return rc;
750 }
751
752 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
753 {
754         spin_lock(&cli->cl_loi_list_lock);
755         oa->o_grant = cli->cl_avail_grant / 4;
756         cli->cl_avail_grant -= oa->o_grant;
757         spin_unlock(&cli->cl_loi_list_lock);
758         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
759                 oa->o_valid |= OBD_MD_FLFLAGS;
760                 oa->o_flags = 0;
761         }
762         oa->o_flags |= OBD_FL_SHRINK_GRANT;
763         osc_update_next_shrink(cli);
764 }
765
766 /* Shrink the current grant, either from some large amount to enough for a
767  * full set of in-flight RPCs, or if we have already shrunk to that limit
768  * then to enough for a single RPC.  This avoids keeping more grant than
769  * needed, and avoids shrinking the grant piecemeal. */
770 static int osc_shrink_grant(struct client_obd *cli)
771 {
772         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
773                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
774
775         spin_lock(&cli->cl_loi_list_lock);
776         if (cli->cl_avail_grant <= target_bytes)
777                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
778         spin_unlock(&cli->cl_loi_list_lock);
779
780         return osc_shrink_grant_to_target(cli, target_bytes);
781 }
782
783 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
784 {
785         int                     rc = 0;
786         struct ost_body        *body;
787         ENTRY;
788
789         spin_lock(&cli->cl_loi_list_lock);
790         /* Don't shrink if we are already above or below the desired limit
791          * We don't want to shrink below a single RPC, as that will negatively
792          * impact block allocation and long-term performance. */
793         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
794                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
795
796         if (target_bytes >= cli->cl_avail_grant) {
797                 spin_unlock(&cli->cl_loi_list_lock);
798                 RETURN(0);
799         }
800         spin_unlock(&cli->cl_loi_list_lock);
801
802         OBD_ALLOC_PTR(body);
803         if (!body)
804                 RETURN(-ENOMEM);
805
806         osc_announce_cached(cli, &body->oa, 0);
807
808         spin_lock(&cli->cl_loi_list_lock);
809         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
810         cli->cl_avail_grant = target_bytes;
811         spin_unlock(&cli->cl_loi_list_lock);
812         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
813                 body->oa.o_valid |= OBD_MD_FLFLAGS;
814                 body->oa.o_flags = 0;
815         }
816         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
817         osc_update_next_shrink(cli);
818
819         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
820                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
821                                 sizeof(*body), body, NULL);
822         if (rc != 0)
823                 __osc_update_grant(cli, body->oa.o_grant);
824         OBD_FREE_PTR(body);
825         RETURN(rc);
826 }
827
828 static int osc_should_shrink_grant(struct client_obd *client)
829 {
830         cfs_time_t time = cfs_time_current();
831         cfs_time_t next_shrink = client->cl_next_shrink_grant;
832
833         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
834              OBD_CONNECT_GRANT_SHRINK) == 0)
835                 return 0;
836
837         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
838                 /* Get the current RPC size directly, instead of going via:
839                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
840                  * Keep comment here so that it can be found by searching. */
841                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
842
843                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
844                     client->cl_avail_grant > brw_size)
845                         return 1;
846                 else
847                         osc_update_next_shrink(client);
848         }
849         return 0;
850 }
851
852 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
853 {
854         struct client_obd *client;
855
856         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
857                 if (osc_should_shrink_grant(client))
858                         osc_shrink_grant(client);
859         }
860         return 0;
861 }
862
863 static int osc_add_shrink_grant(struct client_obd *client)
864 {
865         int rc;
866
867         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
868                                        TIMEOUT_GRANT,
869                                        osc_grant_shrink_grant_cb, NULL,
870                                        &client->cl_grant_shrink_list);
871         if (rc) {
872                 CERROR("add grant client %s error %d\n",
873                         client->cl_import->imp_obd->obd_name, rc);
874                 return rc;
875         }
876         CDEBUG(D_CACHE, "add grant client %s \n",
877                client->cl_import->imp_obd->obd_name);
878         osc_update_next_shrink(client);
879         return 0;
880 }
881
882 static int osc_del_shrink_grant(struct client_obd *client)
883 {
884         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
885                                          TIMEOUT_GRANT);
886 }
887
888 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
889 {
890         /*
891          * ocd_grant is the total grant amount we're expect to hold: if we've
892          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
893          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
894          * dirty.
895          *
896          * race is tolerable here: if we're evicted, but imp_state already
897          * left EVICTED state, then cl_dirty_pages must be 0 already.
898          */
899         spin_lock(&cli->cl_loi_list_lock);
900         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
901                 cli->cl_avail_grant = ocd->ocd_grant;
902         else
903                 cli->cl_avail_grant = ocd->ocd_grant -
904                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
905
906         if (cli->cl_avail_grant < 0) {
907                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
908                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
909                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
910                 /* workaround for servers which do not have the patch from
911                  * LU-2679 */
912                 cli->cl_avail_grant = ocd->ocd_grant;
913         }
914
915         /* determine the appropriate chunk size used by osc_extent. */
916         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
917         spin_unlock(&cli->cl_loi_list_lock);
918
919         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
920                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
921                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
922
923         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
924             list_empty(&cli->cl_grant_shrink_list))
925                 osc_add_shrink_grant(cli);
926 }
927
928 /* We assume that the reason this OSC got a short read is because it read
929  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
930  * via the LOV, and it _knows_ it's reading inside the file, it's just that
931  * this stripe never got written at or beyond this stripe offset yet. */
932 static void handle_short_read(int nob_read, obd_count page_count,
933                               struct brw_page **pga)
934 {
935         char *ptr;
936         int i = 0;
937
938         /* skip bytes read OK */
939         while (nob_read > 0) {
940                 LASSERT (page_count > 0);
941
942                 if (pga[i]->count > nob_read) {
943                         /* EOF inside this page */
944                         ptr = kmap(pga[i]->pg) +
945                                 (pga[i]->off & ~CFS_PAGE_MASK);
946                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
947                         kunmap(pga[i]->pg);
948                         page_count--;
949                         i++;
950                         break;
951                 }
952
953                 nob_read -= pga[i]->count;
954                 page_count--;
955                 i++;
956         }
957
958         /* zero remaining pages */
959         while (page_count-- > 0) {
960                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
961                 memset(ptr, 0, pga[i]->count);
962                 kunmap(pga[i]->pg);
963                 i++;
964         }
965 }
966
967 static int check_write_rcs(struct ptlrpc_request *req,
968                            int requested_nob, int niocount,
969                            obd_count page_count, struct brw_page **pga)
970 {
971         int     i;
972         __u32   *remote_rcs;
973
974         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
975                                                   sizeof(*remote_rcs) *
976                                                   niocount);
977         if (remote_rcs == NULL) {
978                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
979                 return(-EPROTO);
980         }
981
982         /* return error if any niobuf was in error */
983         for (i = 0; i < niocount; i++) {
984                 if ((int)remote_rcs[i] < 0)
985                         return(remote_rcs[i]);
986
987                 if (remote_rcs[i] != 0) {
988                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
989                                 i, remote_rcs[i], req);
990                         return(-EPROTO);
991                 }
992         }
993
994         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
995                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
996                        req->rq_bulk->bd_nob_transferred, requested_nob);
997                 return(-EPROTO);
998         }
999
1000         return (0);
1001 }
1002
1003 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1004 {
1005         if (p1->flag != p2->flag) {
1006                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1007                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1008                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1009
1010                 /* warn if we try to combine flags that we don't know to be
1011                  * safe to combine */
1012                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1013                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1014                               "report this at https://jira.hpdd.intel.com/\n",
1015                               p1->flag, p2->flag);
1016                 }
1017                 return 0;
1018         }
1019
1020         return (p1->off + p1->count == p2->off);
1021 }
1022
1023 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1024                                    struct brw_page **pga, int opc,
1025                                    cksum_type_t cksum_type)
1026 {
1027         __u32                           cksum;
1028         int                             i = 0;
1029         struct cfs_crypto_hash_desc     *hdesc;
1030         unsigned int                    bufsize;
1031         int                             err;
1032         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1033
1034         LASSERT(pg_count > 0);
1035
1036         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1037         if (IS_ERR(hdesc)) {
1038                 CERROR("Unable to initialize checksum hash %s\n",
1039                        cfs_crypto_hash_name(cfs_alg));
1040                 return PTR_ERR(hdesc);
1041         }
1042
1043         while (nob > 0 && pg_count > 0) {
1044                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1045
1046                 /* corrupt the data before we compute the checksum, to
1047                  * simulate an OST->client data error */
1048                 if (i == 0 && opc == OST_READ &&
1049                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1050                         unsigned char *ptr = kmap(pga[i]->pg);
1051                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1052
1053                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1054                         kunmap(pga[i]->pg);
1055                 }
1056                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1057                                             pga[i]->off & ~CFS_PAGE_MASK,
1058                                             count);
1059                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1060                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1061
1062                 nob -= pga[i]->count;
1063                 pg_count--;
1064                 i++;
1065         }
1066
1067         bufsize = sizeof(cksum);
1068         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1069
1070         /* For sending we only compute the wrong checksum instead
1071          * of corrupting the data so it is still correct on a redo */
1072         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1073                 cksum++;
1074
1075         return cksum;
1076 }
1077
1078 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1079                                 struct lov_stripe_md *lsm, obd_count page_count,
1080                                 struct brw_page **pga,
1081                                 struct ptlrpc_request **reqp,
1082                                 struct obd_capa *ocapa, int reserve,
1083                                 int resend)
1084 {
1085         struct ptlrpc_request   *req;
1086         struct ptlrpc_bulk_desc *desc;
1087         struct ost_body         *body;
1088         struct obd_ioobj        *ioobj;
1089         struct niobuf_remote    *niobuf;
1090         int niocount, i, requested_nob, opc, rc;
1091         struct osc_brw_async_args *aa;
1092         struct req_capsule      *pill;
1093         struct brw_page *pg_prev;
1094
1095         ENTRY;
1096         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1097                 RETURN(-ENOMEM); /* Recoverable */
1098         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1099                 RETURN(-EINVAL); /* Fatal */
1100
1101         if ((cmd & OBD_BRW_WRITE) != 0) {
1102                 opc = OST_WRITE;
1103                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1104                                                 cli->cl_import->imp_rq_pool,
1105                                                 &RQF_OST_BRW_WRITE);
1106         } else {
1107                 opc = OST_READ;
1108                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1109         }
1110         if (req == NULL)
1111                 RETURN(-ENOMEM);
1112
1113         for (niocount = i = 1; i < page_count; i++) {
1114                 if (!can_merge_pages(pga[i - 1], pga[i]))
1115                         niocount++;
1116         }
1117
1118         pill = &req->rq_pill;
1119         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1120                              sizeof(*ioobj));
1121         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1122                              niocount * sizeof(*niobuf));
1123         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1124
1125         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1126         if (rc) {
1127                 ptlrpc_request_free(req);
1128                 RETURN(rc);
1129         }
1130         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1131         ptlrpc_at_set_req_timeout(req);
1132         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1133          * retry logic */
1134         req->rq_no_retry_einprogress = 1;
1135
1136         desc = ptlrpc_prep_bulk_imp(req, page_count,
1137                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1138                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1139                 OST_BULK_PORTAL);
1140
1141         if (desc == NULL)
1142                 GOTO(out, rc = -ENOMEM);
1143         /* NB request now owns desc and will free it when it gets freed */
1144
1145         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1146         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1147         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1148         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1149
1150         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1151
1152         obdo_to_ioobj(oa, ioobj);
1153         ioobj->ioo_bufcnt = niocount;
1154         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1155          * that might be send for this request.  The actual number is decided
1156          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1157          * "max - 1" for old client compatibility sending "0", and also so the
1158          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1159         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1160         osc_pack_capa(req, body, ocapa);
1161         LASSERT(page_count > 0);
1162         pg_prev = pga[0];
1163         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1164                 struct brw_page *pg = pga[i];
1165                 int poff = pg->off & ~CFS_PAGE_MASK;
1166
1167                 LASSERT(pg->count > 0);
1168                 /* make sure there is no gap in the middle of page array */
1169                 LASSERTF(page_count == 1 ||
1170                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1171                           ergo(i > 0 && i < page_count - 1,
1172                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1173                           ergo(i == page_count - 1, poff == 0)),
1174                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1175                          i, page_count, pg, pg->off, pg->count);
1176                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1177                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1178                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1179                          i, page_count,
1180                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1181                          pg_prev->pg, page_private(pg_prev->pg),
1182                          pg_prev->pg->index, pg_prev->off);
1183                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1184                         (pg->flag & OBD_BRW_SRVLOCK));
1185
1186                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1187                 requested_nob += pg->count;
1188
1189                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1190                         niobuf--;
1191                         niobuf->rnb_len += pg->count;
1192                 } else {
1193                         niobuf->rnb_offset = pg->off;
1194                         niobuf->rnb_len    = pg->count;
1195                         niobuf->rnb_flags  = pg->flag;
1196                 }
1197                 pg_prev = pg;
1198         }
1199
1200         LASSERTF((void *)(niobuf - niocount) ==
1201                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1202                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1203                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1204
1205         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1206         if (resend) {
1207                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1208                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1209                         body->oa.o_flags = 0;
1210                 }
1211                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1212         }
1213
1214         if (osc_should_shrink_grant(cli))
1215                 osc_shrink_grant_local(cli, &body->oa);
1216
1217         /* size[REQ_REC_OFF] still sizeof (*body) */
1218         if (opc == OST_WRITE) {
1219                 if (cli->cl_checksum &&
1220                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1221                         /* store cl_cksum_type in a local variable since
1222                          * it can be changed via lprocfs */
1223                         cksum_type_t cksum_type = cli->cl_cksum_type;
1224
1225                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1226                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1227                                 body->oa.o_flags = 0;
1228                         }
1229                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1230                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1231                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1232                                                              page_count, pga,
1233                                                              OST_WRITE,
1234                                                              cksum_type);
1235                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1236                                body->oa.o_cksum);
1237                         /* save this in 'oa', too, for later checking */
1238                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1239                         oa->o_flags |= cksum_type_pack(cksum_type);
1240                 } else {
1241                         /* clear out the checksum flag, in case this is a
1242                          * resend but cl_checksum is no longer set. b=11238 */
1243                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1244                 }
1245                 oa->o_cksum = body->oa.o_cksum;
1246                 /* 1 RC per niobuf */
1247                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1248                                      sizeof(__u32) * niocount);
1249         } else {
1250                 if (cli->cl_checksum &&
1251                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1252                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1253                                 body->oa.o_flags = 0;
1254                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1255                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1256                 }
1257         }
1258         ptlrpc_request_set_replen(req);
1259
1260         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1261         aa = ptlrpc_req_async_args(req);
1262         aa->aa_oa = oa;
1263         aa->aa_requested_nob = requested_nob;
1264         aa->aa_nio_count = niocount;
1265         aa->aa_page_count = page_count;
1266         aa->aa_resends = 0;
1267         aa->aa_ppga = pga;
1268         aa->aa_cli = cli;
1269         INIT_LIST_HEAD(&aa->aa_oaps);
1270         if (ocapa && reserve)
1271                 aa->aa_ocapa = capa_get(ocapa);
1272
1273         *reqp = req;
1274         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1275         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1276                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1277                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1278         RETURN(0);
1279
1280  out:
1281         ptlrpc_req_finished(req);
1282         RETURN(rc);
1283 }
1284
1285 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1286                                 __u32 client_cksum, __u32 server_cksum, int nob,
1287                                 obd_count page_count, struct brw_page **pga,
1288                                 cksum_type_t client_cksum_type)
1289 {
1290         __u32 new_cksum;
1291         char *msg;
1292         cksum_type_t cksum_type;
1293
1294         if (server_cksum == client_cksum) {
1295                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1296                 return 0;
1297         }
1298
1299         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1300                                        oa->o_flags : 0);
1301         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1302                                       cksum_type);
1303
1304         if (cksum_type != client_cksum_type)
1305                 msg = "the server did not use the checksum type specified in "
1306                       "the original request - likely a protocol problem";
1307         else if (new_cksum == server_cksum)
1308                 msg = "changed on the client after we checksummed it - "
1309                       "likely false positive due to mmap IO (bug 11742)";
1310         else if (new_cksum == client_cksum)
1311                 msg = "changed in transit before arrival at OST";
1312         else
1313                 msg = "changed in transit AND doesn't match the original - "
1314                       "likely false positive due to mmap IO (bug 11742)";
1315
1316         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1317                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1318                            msg, libcfs_nid2str(peer->nid),
1319                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1320                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1321                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1322                            POSTID(&oa->o_oi), pga[0]->off,
1323                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1324         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1325                "client csum now %x\n", client_cksum, client_cksum_type,
1326                server_cksum, cksum_type, new_cksum);
1327         return 1;
1328 }
1329
1330 /* Note rc enters this function as number of bytes transferred */
1331 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1332 {
1333         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1334         const lnet_process_id_t *peer =
1335                         &req->rq_import->imp_connection->c_peer;
1336         struct client_obd *cli = aa->aa_cli;
1337         struct ost_body *body;
1338         u32 client_cksum = 0;
1339         ENTRY;
1340
1341         if (rc < 0 && rc != -EDQUOT) {
1342                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1343                 RETURN(rc);
1344         }
1345
1346         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1347         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1348         if (body == NULL) {
1349                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1350                 RETURN(-EPROTO);
1351         }
1352
1353         /* set/clear over quota flag for a uid/gid */
1354         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1355             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1356                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1357
1358                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1359                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1360                        body->oa.o_flags);
1361                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1362         }
1363
1364         osc_update_grant(cli, body);
1365
1366         if (rc < 0)
1367                 RETURN(rc);
1368
1369         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1370                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1371
1372         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1373                 if (rc > 0) {
1374                         CERROR("Unexpected +ve rc %d\n", rc);
1375                         RETURN(-EPROTO);
1376                 }
1377                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1378
1379                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1380                         RETURN(-EAGAIN);
1381
1382                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1383                     check_write_checksum(&body->oa, peer, client_cksum,
1384                                          body->oa.o_cksum, aa->aa_requested_nob,
1385                                          aa->aa_page_count, aa->aa_ppga,
1386                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1387                         RETURN(-EAGAIN);
1388
1389                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1390                                      aa->aa_page_count, aa->aa_ppga);
1391                 GOTO(out, rc);
1392         }
1393
1394         /* The rest of this function executes only for OST_READs */
1395
1396         /* if unwrap_bulk failed, return -EAGAIN to retry */
1397         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1398         if (rc < 0)
1399                 GOTO(out, rc = -EAGAIN);
1400
1401         if (rc > aa->aa_requested_nob) {
1402                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1403                        aa->aa_requested_nob);
1404                 RETURN(-EPROTO);
1405         }
1406
1407         if (rc != req->rq_bulk->bd_nob_transferred) {
1408                 CERROR ("Unexpected rc %d (%d transferred)\n",
1409                         rc, req->rq_bulk->bd_nob_transferred);
1410                 return (-EPROTO);
1411         }
1412
1413         if (rc < aa->aa_requested_nob)
1414                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1415
1416         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1417                 static int cksum_counter;
1418                 u32        server_cksum = body->oa.o_cksum;
1419                 char      *via = "";
1420                 char      *router = "";
1421                 cksum_type_t cksum_type;
1422
1423                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1424                                                body->oa.o_flags : 0);
1425                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1426                                                  aa->aa_ppga, OST_READ,
1427                                                  cksum_type);
1428
1429                 if (peer->nid != req->rq_bulk->bd_sender) {
1430                         via = " via ";
1431                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1432                 }
1433
1434                 if (server_cksum != client_cksum) {
1435                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1436                                            "%s%s%s inode "DFID" object "DOSTID
1437                                            " extent ["LPU64"-"LPU64"]\n",
1438                                            req->rq_import->imp_obd->obd_name,
1439                                            libcfs_nid2str(peer->nid),
1440                                            via, router,
1441                                            body->oa.o_valid & OBD_MD_FLFID ?
1442                                                 body->oa.o_parent_seq : (__u64)0,
1443                                            body->oa.o_valid & OBD_MD_FLFID ?
1444                                                 body->oa.o_parent_oid : 0,
1445                                            body->oa.o_valid & OBD_MD_FLFID ?
1446                                                 body->oa.o_parent_ver : 0,
1447                                            POSTID(&body->oa.o_oi),
1448                                            aa->aa_ppga[0]->off,
1449                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1450                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1451                                                                         1);
1452                         CERROR("client %x, server %x, cksum_type %x\n",
1453                                client_cksum, server_cksum, cksum_type);
1454                         cksum_counter = 0;
1455                         aa->aa_oa->o_cksum = client_cksum;
1456                         rc = -EAGAIN;
1457                 } else {
1458                         cksum_counter++;
1459                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1460                         rc = 0;
1461                 }
1462         } else if (unlikely(client_cksum)) {
1463                 static int cksum_missed;
1464
1465                 cksum_missed++;
1466                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1467                         CERROR("Checksum %u requested from %s but not sent\n",
1468                                cksum_missed, libcfs_nid2str(peer->nid));
1469         } else {
1470                 rc = 0;
1471         }
1472 out:
1473         if (rc >= 0)
1474                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1475                                      aa->aa_oa, &body->oa);
1476
1477         RETURN(rc);
1478 }
1479
1480 static int osc_brw_redo_request(struct ptlrpc_request *request,
1481                                 struct osc_brw_async_args *aa, int rc)
1482 {
1483         struct ptlrpc_request *new_req;
1484         struct osc_brw_async_args *new_aa;
1485         struct osc_async_page *oap;
1486         ENTRY;
1487
1488         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1489                   "redo for recoverable error %d", rc);
1490
1491         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1492                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1493                                   aa->aa_cli, aa->aa_oa,
1494                                   NULL /* lsm unused by osc currently */,
1495                                   aa->aa_page_count, aa->aa_ppga,
1496                                   &new_req, aa->aa_ocapa, 0, 1);
1497         if (rc)
1498                 RETURN(rc);
1499
1500         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1501                 if (oap->oap_request != NULL) {
1502                         LASSERTF(request == oap->oap_request,
1503                                  "request %p != oap_request %p\n",
1504                                  request, oap->oap_request);
1505                         if (oap->oap_interrupted) {
1506                                 ptlrpc_req_finished(new_req);
1507                                 RETURN(-EINTR);
1508                         }
1509                 }
1510         }
1511         /* New request takes over pga and oaps from old request.
1512          * Note that copying a list_head doesn't work, need to move it... */
1513         aa->aa_resends++;
1514         new_req->rq_interpret_reply = request->rq_interpret_reply;
1515         new_req->rq_async_args = request->rq_async_args;
1516         new_req->rq_commit_cb = request->rq_commit_cb;
1517         /* cap resend delay to the current request timeout, this is similar to
1518          * what ptlrpc does (see after_reply()) */
1519         if (aa->aa_resends > new_req->rq_timeout)
1520                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1521         else
1522                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1523         new_req->rq_generation_set = 1;
1524         new_req->rq_import_generation = request->rq_import_generation;
1525
1526         new_aa = ptlrpc_req_async_args(new_req);
1527
1528         INIT_LIST_HEAD(&new_aa->aa_oaps);
1529         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1530         INIT_LIST_HEAD(&new_aa->aa_exts);
1531         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1532         new_aa->aa_resends = aa->aa_resends;
1533
1534         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1535                 if (oap->oap_request) {
1536                         ptlrpc_req_finished(oap->oap_request);
1537                         oap->oap_request = ptlrpc_request_addref(new_req);
1538                 }
1539         }
1540
1541         new_aa->aa_ocapa = aa->aa_ocapa;
1542         aa->aa_ocapa = NULL;
1543
1544         /* XXX: This code will run into problem if we're going to support
1545          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1546          * and wait for all of them to be finished. We should inherit request
1547          * set from old request. */
1548         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1549
1550         DEBUG_REQ(D_INFO, new_req, "new request");
1551         RETURN(0);
1552 }
1553
1554 /*
1555  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1556  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1557  * fine for our small page arrays and doesn't require allocation.  its an
1558  * insertion sort that swaps elements that are strides apart, shrinking the
1559  * stride down until its '1' and the array is sorted.
1560  */
1561 static void sort_brw_pages(struct brw_page **array, int num)
1562 {
1563         int stride, i, j;
1564         struct brw_page *tmp;
1565
1566         if (num == 1)
1567                 return;
1568         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1569                 ;
1570
1571         do {
1572                 stride /= 3;
1573                 for (i = stride ; i < num ; i++) {
1574                         tmp = array[i];
1575                         j = i;
1576                         while (j >= stride && array[j - stride]->off > tmp->off) {
1577                                 array[j] = array[j - stride];
1578                                 j -= stride;
1579                         }
1580                         array[j] = tmp;
1581                 }
1582         } while (stride > 1);
1583 }
1584
1585 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1586 {
1587         LASSERT(ppga != NULL);
1588         OBD_FREE(ppga, sizeof(*ppga) * count);
1589 }
1590
1591 static int brw_interpret(const struct lu_env *env,
1592                          struct ptlrpc_request *req, void *data, int rc)
1593 {
1594         struct osc_brw_async_args *aa = data;
1595         struct osc_extent *ext;
1596         struct osc_extent *tmp;
1597         struct client_obd *cli = aa->aa_cli;
1598         ENTRY;
1599
1600         rc = osc_brw_fini_request(req, rc);
1601         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1602         /* When server return -EINPROGRESS, client should always retry
1603          * regardless of the number of times the bulk was resent already. */
1604         if (osc_recoverable_error(rc)) {
1605                 if (req->rq_import_generation !=
1606                     req->rq_import->imp_generation) {
1607                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1608                                ""DOSTID", rc = %d.\n",
1609                                req->rq_import->imp_obd->obd_name,
1610                                POSTID(&aa->aa_oa->o_oi), rc);
1611                 } else if (rc == -EINPROGRESS ||
1612                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1613                         rc = osc_brw_redo_request(req, aa, rc);
1614                 } else {
1615                         CERROR("%s: too many resent retries for object: "
1616                                ""LPU64":"LPU64", rc = %d.\n",
1617                                req->rq_import->imp_obd->obd_name,
1618                                POSTID(&aa->aa_oa->o_oi), rc);
1619                 }
1620
1621                 if (rc == 0)
1622                         RETURN(0);
1623                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1624                         rc = -EIO;
1625         }
1626
1627         if (aa->aa_ocapa) {
1628                 capa_put(aa->aa_ocapa);
1629                 aa->aa_ocapa = NULL;
1630         }
1631
1632         if (rc == 0) {
1633                 struct obdo *oa = aa->aa_oa;
1634                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1635                 unsigned long valid = 0;
1636                 struct cl_object *obj;
1637                 struct osc_async_page *last;
1638
1639                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1640                 obj = osc2cl(last->oap_obj);
1641
1642                 cl_object_attr_lock(obj);
1643                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1644                         attr->cat_blocks = oa->o_blocks;
1645                         valid |= CAT_BLOCKS;
1646                 }
1647                 if (oa->o_valid & OBD_MD_FLMTIME) {
1648                         attr->cat_mtime = oa->o_mtime;
1649                         valid |= CAT_MTIME;
1650                 }
1651                 if (oa->o_valid & OBD_MD_FLATIME) {
1652                         attr->cat_atime = oa->o_atime;
1653                         valid |= CAT_ATIME;
1654                 }
1655                 if (oa->o_valid & OBD_MD_FLCTIME) {
1656                         attr->cat_ctime = oa->o_ctime;
1657                         valid |= CAT_CTIME;
1658                 }
1659
1660                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1661                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1662                         loff_t last_off = last->oap_count + last->oap_obj_off +
1663                                 last->oap_page_off;
1664
1665                         /* Change file size if this is an out of quota or
1666                          * direct IO write and it extends the file size */
1667                         if (loi->loi_lvb.lvb_size < last_off) {
1668                                 attr->cat_size = last_off;
1669                                 valid |= CAT_SIZE;
1670                         }
1671                         /* Extend KMS if it's not a lockless write */
1672                         if (loi->loi_kms < last_off &&
1673                             oap2osc_page(last)->ops_srvlock == 0) {
1674                                 attr->cat_kms = last_off;
1675                                 valid |= CAT_KMS;
1676                         }
1677                 }
1678
1679                 if (valid != 0)
1680                         cl_object_attr_update(env, obj, attr, valid);
1681                 cl_object_attr_unlock(obj);
1682         }
1683         OBDO_FREE(aa->aa_oa);
1684
1685         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1686                 osc_inc_unstable_pages(req);
1687
1688         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1689                 list_del_init(&ext->oe_link);
1690                 osc_extent_finish(env, ext, 1, rc);
1691         }
1692         LASSERT(list_empty(&aa->aa_exts));
1693         LASSERT(list_empty(&aa->aa_oaps));
1694
1695         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1696                           req->rq_bulk->bd_nob_transferred);
1697         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1698         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1699
1700         spin_lock(&cli->cl_loi_list_lock);
1701         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1702          * is called so we know whether to go to sync BRWs or wait for more
1703          * RPCs to complete */
1704         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1705                 cli->cl_w_in_flight--;
1706         else
1707                 cli->cl_r_in_flight--;
1708         osc_wake_cache_waiters(cli);
1709         spin_unlock(&cli->cl_loi_list_lock);
1710
1711         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1712         RETURN(rc);
1713 }
1714
1715 static void brw_commit(struct ptlrpc_request *req)
1716 {
1717         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1718          * this called via the rq_commit_cb, I need to ensure
1719          * osc_dec_unstable_pages is still called. Otherwise unstable
1720          * pages may be leaked. */
1721         spin_lock(&req->rq_lock);
1722         if (likely(req->rq_unstable)) {
1723                 req->rq_unstable = 0;
1724                 spin_unlock(&req->rq_lock);
1725
1726                 osc_dec_unstable_pages(req);
1727         } else {
1728                 req->rq_committed = 1;
1729                 spin_unlock(&req->rq_lock);
1730         }
1731 }
1732
1733 /**
1734  * Build an RPC by the list of extent @ext_list. The caller must ensure
1735  * that the total pages in this list are NOT over max pages per RPC.
1736  * Extents in the list must be in OES_RPC state.
1737  */
1738 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1739                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1740 {
1741         struct ptlrpc_request           *req = NULL;
1742         struct osc_extent               *ext;
1743         struct brw_page                 **pga = NULL;
1744         struct osc_brw_async_args       *aa = NULL;
1745         struct obdo                     *oa = NULL;
1746         struct osc_async_page           *oap;
1747         struct osc_async_page           *tmp;
1748         struct cl_req                   *clerq = NULL;
1749         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1750                                                                       CRT_READ;
1751         struct cl_req_attr              *crattr = NULL;
1752         obd_off                         starting_offset = OBD_OBJECT_EOF;
1753         obd_off                         ending_offset = 0;
1754         int                             mpflag = 0;
1755         int                             mem_tight = 0;
1756         int                             page_count = 0;
1757         bool                            soft_sync = false;
1758         int                             i;
1759         int                             rc;
1760         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1761         struct ost_body                 *body;
1762         ENTRY;
1763         LASSERT(!list_empty(ext_list));
1764
1765         /* add pages into rpc_list to build BRW rpc */
1766         list_for_each_entry(ext, ext_list, oe_link) {
1767                 LASSERT(ext->oe_state == OES_RPC);
1768                 mem_tight |= ext->oe_memalloc;
1769                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1770                         ++page_count;
1771                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1772                         if (starting_offset > oap->oap_obj_off)
1773                                 starting_offset = oap->oap_obj_off;
1774                         else
1775                                 LASSERT(oap->oap_page_off == 0);
1776                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1777                                 ending_offset = oap->oap_obj_off +
1778                                                 oap->oap_count;
1779                         else
1780                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1781                                         PAGE_CACHE_SIZE);
1782                 }
1783         }
1784
1785         soft_sync = osc_over_unstable_soft_limit(cli);
1786         if (mem_tight)
1787                 mpflag = cfs_memory_pressure_get_and_set();
1788
1789         OBD_ALLOC(crattr, sizeof(*crattr));
1790         if (crattr == NULL)
1791                 GOTO(out, rc = -ENOMEM);
1792
1793         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1794         if (pga == NULL)
1795                 GOTO(out, rc = -ENOMEM);
1796
1797         OBDO_ALLOC(oa);
1798         if (oa == NULL)
1799                 GOTO(out, rc = -ENOMEM);
1800
1801         i = 0;
1802         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1803                 struct cl_page *page = oap2cl_page(oap);
1804                 if (clerq == NULL) {
1805                         clerq = cl_req_alloc(env, page, crt,
1806                                              1 /* only 1-object rpcs for now */);
1807                         if (IS_ERR(clerq))
1808                                 GOTO(out, rc = PTR_ERR(clerq));
1809                 }
1810                 if (mem_tight)
1811                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1812                 if (soft_sync)
1813                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1814                 pga[i] = &oap->oap_brw_page;
1815                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1816                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1817                        pga[i]->pg, page_index(oap->oap_page), oap,
1818                        pga[i]->flag);
1819                 i++;
1820                 cl_req_page_add(env, clerq, page);
1821         }
1822
1823         /* always get the data for the obdo for the rpc */
1824         LASSERT(clerq != NULL);
1825         crattr->cra_oa = oa;
1826         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1827
1828         rc = cl_req_prep(env, clerq);
1829         if (rc != 0) {
1830                 CERROR("cl_req_prep failed: %d\n", rc);
1831                 GOTO(out, rc);
1832         }
1833
1834         sort_brw_pages(pga, page_count);
1835         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1836                         pga, &req, crattr->cra_capa, 1, 0);
1837         if (rc != 0) {
1838                 CERROR("prep_req failed: %d\n", rc);
1839                 GOTO(out, rc);
1840         }
1841
1842         req->rq_commit_cb = brw_commit;
1843         req->rq_interpret_reply = brw_interpret;
1844
1845         if (mem_tight != 0)
1846                 req->rq_memalloc = 1;
1847
1848         /* Need to update the timestamps after the request is built in case
1849          * we race with setattr (locally or in queue at OST).  If OST gets
1850          * later setattr before earlier BRW (as determined by the request xid),
1851          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1852          * way to do this in a single call.  bug 10150 */
1853         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1854         crattr->cra_oa = &body->oa;
1855         cl_req_attr_set(env, clerq, crattr,
1856                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1857
1858         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1859
1860         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1861         aa = ptlrpc_req_async_args(req);
1862         INIT_LIST_HEAD(&aa->aa_oaps);
1863         list_splice_init(&rpc_list, &aa->aa_oaps);
1864         INIT_LIST_HEAD(&aa->aa_exts);
1865         list_splice_init(ext_list, &aa->aa_exts);
1866         aa->aa_clerq = clerq;
1867
1868         /* queued sync pages can be torn down while the pages
1869          * were between the pending list and the rpc */
1870         tmp = NULL;
1871         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1872                 /* only one oap gets a request reference */
1873                 if (tmp == NULL)
1874                         tmp = oap;
1875                 if (oap->oap_interrupted && !req->rq_intr) {
1876                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1877                                         oap, req);
1878                         ptlrpc_mark_interrupted(req);
1879                 }
1880         }
1881         if (tmp != NULL)
1882                 tmp->oap_request = ptlrpc_request_addref(req);
1883
1884         spin_lock(&cli->cl_loi_list_lock);
1885         starting_offset >>= PAGE_CACHE_SHIFT;
1886         if (cmd == OBD_BRW_READ) {
1887                 cli->cl_r_in_flight++;
1888                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1889                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1890                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1891                                       starting_offset + 1);
1892         } else {
1893                 cli->cl_w_in_flight++;
1894                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1895                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1896                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1897                                       starting_offset + 1);
1898         }
1899         spin_unlock(&cli->cl_loi_list_lock);
1900
1901         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1902                   page_count, aa, cli->cl_r_in_flight,
1903                   cli->cl_w_in_flight);
1904
1905         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1906          * see which CPU/NUMA node the majority of pages were allocated
1907          * on, and try to assign the async RPC to the CPU core
1908          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1909          *
1910          * But on the other hand, we expect that multiple ptlrpcd
1911          * threads and the initial write sponsor can run in parallel,
1912          * especially when data checksum is enabled, which is CPU-bound
1913          * operation and single ptlrpcd thread cannot process in time.
1914          * So more ptlrpcd threads sharing BRW load
1915          * (with PDL_POLICY_ROUND) seems better.
1916          */
1917         ptlrpcd_add_req(req, pol, -1);
1918         rc = 0;
1919         EXIT;
1920
1921 out:
1922         if (mem_tight != 0)
1923                 cfs_memory_pressure_restore(mpflag);
1924
1925         if (crattr != NULL) {
1926                 capa_put(crattr->cra_capa);
1927                 OBD_FREE(crattr, sizeof(*crattr));
1928         }
1929
1930         if (rc != 0) {
1931                 LASSERT(req == NULL);
1932
1933                 if (oa)
1934                         OBDO_FREE(oa);
1935                 if (pga)
1936                         OBD_FREE(pga, sizeof(*pga) * page_count);
1937                 /* this should happen rarely and is pretty bad, it makes the
1938                  * pending list not follow the dirty order */
1939                 while (!list_empty(ext_list)) {
1940                         ext = list_entry(ext_list->next, struct osc_extent,
1941                                          oe_link);
1942                         list_del_init(&ext->oe_link);
1943                         osc_extent_finish(env, ext, 0, rc);
1944                 }
1945                 if (clerq && !IS_ERR(clerq))
1946                         cl_req_completion(env, clerq, rc);
1947         }
1948         RETURN(rc);
1949 }
1950
1951 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1952                                         struct ldlm_enqueue_info *einfo)
1953 {
1954         void *data = einfo->ei_cbdata;
1955         int set = 0;
1956
1957         LASSERT(lock != NULL);
1958         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1959         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1960         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1961         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1962
1963         lock_res_and_lock(lock);
1964
1965         if (lock->l_ast_data == NULL)
1966                 lock->l_ast_data = data;
1967         if (lock->l_ast_data == data)
1968                 set = 1;
1969
1970         unlock_res_and_lock(lock);
1971
1972         return set;
1973 }
1974
1975 static int osc_set_data_with_check(struct lustre_handle *lockh,
1976                                    struct ldlm_enqueue_info *einfo)
1977 {
1978         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1979         int set = 0;
1980
1981         if (lock != NULL) {
1982                 set = osc_set_lock_data_with_check(lock, einfo);
1983                 LDLM_LOCK_PUT(lock);
1984         } else
1985                 CERROR("lockh %p, data %p - client evicted?\n",
1986                        lockh, einfo->ei_cbdata);
1987         return set;
1988 }
1989
1990 static int osc_enqueue_fini(struct ptlrpc_request *req,
1991                             osc_enqueue_upcall_f upcall, void *cookie,
1992                             struct lustre_handle *lockh, ldlm_mode_t mode,
1993                             __u64 *flags, int agl, int errcode)
1994 {
1995         bool intent = *flags & LDLM_FL_HAS_INTENT;
1996         int rc;
1997         ENTRY;
1998
1999         /* The request was created before ldlm_cli_enqueue call. */
2000         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2001                 struct ldlm_reply *rep;
2002
2003                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2004                 LASSERT(rep != NULL);
2005
2006                 rep->lock_policy_res1 =
2007                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2008                 if (rep->lock_policy_res1)
2009                         errcode = rep->lock_policy_res1;
2010                 if (!agl)
2011                         *flags |= LDLM_FL_LVB_READY;
2012         } else if (errcode == ELDLM_OK) {
2013                 *flags |= LDLM_FL_LVB_READY;
2014         }
2015
2016         /* Call the update callback. */
2017         rc = (*upcall)(cookie, lockh, errcode);
2018
2019         /* release the reference taken in ldlm_cli_enqueue() */
2020         if (errcode == ELDLM_LOCK_MATCHED)
2021                 errcode = ELDLM_OK;
2022         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2023                 ldlm_lock_decref(lockh, mode);
2024
2025         RETURN(rc);
2026 }
2027
2028 static int osc_enqueue_interpret(const struct lu_env *env,
2029                                  struct ptlrpc_request *req,
2030                                  struct osc_enqueue_args *aa, int rc)
2031 {
2032         struct ldlm_lock *lock;
2033         struct lustre_handle *lockh = &aa->oa_lockh;
2034         ldlm_mode_t mode = aa->oa_mode;
2035         struct ost_lvb *lvb = aa->oa_lvb;
2036         __u32 lvb_len = sizeof(*lvb);
2037         __u64 flags = 0;
2038
2039         ENTRY;
2040
2041         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2042          * be valid. */
2043         lock = ldlm_handle2lock(lockh);
2044         LASSERTF(lock != NULL,
2045                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2046                  lockh->cookie, req, aa);
2047
2048         /* Take an additional reference so that a blocking AST that
2049          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2050          * to arrive after an upcall has been executed by
2051          * osc_enqueue_fini(). */
2052         ldlm_lock_addref(lockh, mode);
2053
2054         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2055         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2056
2057         /* Let CP AST to grant the lock first. */
2058         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2059
2060         if (aa->oa_agl) {
2061                 LASSERT(aa->oa_lvb == NULL);
2062                 LASSERT(aa->oa_flags == NULL);
2063                 aa->oa_flags = &flags;
2064         }
2065
2066         /* Complete obtaining the lock procedure. */
2067         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2068                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2069                                    lockh, rc);
2070         /* Complete osc stuff. */
2071         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2072                               aa->oa_flags, aa->oa_agl, rc);
2073
2074         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2075
2076         ldlm_lock_decref(lockh, mode);
2077         LDLM_LOCK_PUT(lock);
2078         RETURN(rc);
2079 }
2080
2081 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2082
2083 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2084  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2085  * other synchronous requests, however keeping some locks and trying to obtain
2086  * others may take a considerable amount of time in a case of ost failure; and
2087  * when other sync requests do not get released lock from a client, the client
2088  * is evicted from the cluster -- such scenarious make the life difficult, so
2089  * release locks just after they are obtained. */
2090 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2091                      __u64 *flags, ldlm_policy_data_t *policy,
2092                      struct ost_lvb *lvb, int kms_valid,
2093                      osc_enqueue_upcall_f upcall, void *cookie,
2094                      struct ldlm_enqueue_info *einfo,
2095                      struct ptlrpc_request_set *rqset, int async, int agl)
2096 {
2097         struct obd_device *obd = exp->exp_obd;
2098         struct lustre_handle lockh = { 0 };
2099         struct ptlrpc_request *req = NULL;
2100         int intent = *flags & LDLM_FL_HAS_INTENT;
2101         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2102         ldlm_mode_t mode;
2103         int rc;
2104         ENTRY;
2105
2106         /* Filesystem lock extents are extended to page boundaries so that
2107          * dealing with the page cache is a little smoother.  */
2108         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2109         policy->l_extent.end |= ~CFS_PAGE_MASK;
2110
2111         /*
2112          * kms is not valid when either object is completely fresh (so that no
2113          * locks are cached), or object was evicted. In the latter case cached
2114          * lock cannot be used, because it would prime inode state with
2115          * potentially stale LVB.
2116          */
2117         if (!kms_valid)
2118                 goto no_match;
2119
2120         /* Next, search for already existing extent locks that will cover us */
2121         /* If we're trying to read, we also search for an existing PW lock.  The
2122          * VFS and page cache already protect us locally, so lots of readers/
2123          * writers can share a single PW lock.
2124          *
2125          * There are problems with conversion deadlocks, so instead of
2126          * converting a read lock to a write lock, we'll just enqueue a new
2127          * one.
2128          *
2129          * At some point we should cancel the read lock instead of making them
2130          * send us a blocking callback, but there are problems with canceling
2131          * locks out from other users right now, too. */
2132         mode = einfo->ei_mode;
2133         if (einfo->ei_mode == LCK_PR)
2134                 mode |= LCK_PW;
2135         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2136                                einfo->ei_type, policy, mode, &lockh, 0);
2137         if (mode) {
2138                 struct ldlm_lock *matched;
2139
2140                 if (*flags & LDLM_FL_TEST_LOCK)
2141                         RETURN(ELDLM_OK);
2142
2143                 matched = ldlm_handle2lock(&lockh);
2144                 if (agl) {
2145                         /* AGL enqueues DLM locks speculatively. Therefore if
2146                          * it already exists a DLM lock, it wll just inform the
2147                          * caller to cancel the AGL process for this stripe. */
2148                         ldlm_lock_decref(&lockh, mode);
2149                         LDLM_LOCK_PUT(matched);
2150                         RETURN(-ECANCELED);
2151                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2152                         *flags |= LDLM_FL_LVB_READY;
2153
2154                         /* We already have a lock, and it's referenced. */
2155                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2156
2157                         ldlm_lock_decref(&lockh, mode);
2158                         LDLM_LOCK_PUT(matched);
2159                         RETURN(ELDLM_OK);
2160                 } else {
2161                         ldlm_lock_decref(&lockh, mode);
2162                         LDLM_LOCK_PUT(matched);
2163                 }
2164         }
2165
2166 no_match:
2167         if (*flags & LDLM_FL_TEST_LOCK)
2168                 RETURN(-ENOLCK);
2169
2170         if (intent) {
2171                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2172                                            &RQF_LDLM_ENQUEUE_LVB);
2173                 if (req == NULL)
2174                         RETURN(-ENOMEM);
2175
2176                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2177                 if (rc < 0) {
2178                         ptlrpc_request_free(req);
2179                         RETURN(rc);
2180                 }
2181
2182                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2183                                      sizeof *lvb);
2184                 ptlrpc_request_set_replen(req);
2185         }
2186
2187         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2188         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2189
2190         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2191                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2192         if (async) {
2193                 if (!rc) {
2194                         struct osc_enqueue_args *aa;
2195                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2196                         aa = ptlrpc_req_async_args(req);
2197                         aa->oa_exp    = exp;
2198                         aa->oa_mode   = einfo->ei_mode;
2199                         aa->oa_type   = einfo->ei_type;
2200                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2201                         aa->oa_upcall = upcall;
2202                         aa->oa_cookie = cookie;
2203                         aa->oa_agl    = !!agl;
2204                         if (!agl) {
2205                                 aa->oa_flags  = flags;
2206                                 aa->oa_lvb    = lvb;
2207                         } else {
2208                                 /* AGL is essentially to enqueue an DLM lock
2209                                  * in advance, so we don't care about the
2210                                  * result of AGL enqueue. */
2211                                 aa->oa_lvb    = NULL;
2212                                 aa->oa_flags  = NULL;
2213                         }
2214
2215                         req->rq_interpret_reply =
2216                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2217                         if (rqset == PTLRPCD_SET)
2218                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2219                         else
2220                                 ptlrpc_set_add_req(rqset, req);
2221                 } else if (intent) {
2222                         ptlrpc_req_finished(req);
2223                 }
2224                 RETURN(rc);
2225         }
2226
2227         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2228                               flags, agl, rc);
2229         if (intent)
2230                 ptlrpc_req_finished(req);
2231
2232         RETURN(rc);
2233 }
2234
2235 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2236                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2237                    __u64 *flags, void *data, struct lustre_handle *lockh,
2238                    int unref)
2239 {
2240         struct obd_device *obd = exp->exp_obd;
2241         __u64 lflags = *flags;
2242         ldlm_mode_t rc;
2243         ENTRY;
2244
2245         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2246                 RETURN(-EIO);
2247
2248         /* Filesystem lock extents are extended to page boundaries so that
2249          * dealing with the page cache is a little smoother */
2250         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2251         policy->l_extent.end |= ~CFS_PAGE_MASK;
2252
2253         /* Next, search for already existing extent locks that will cover us */
2254         /* If we're trying to read, we also search for an existing PW lock.  The
2255          * VFS and page cache already protect us locally, so lots of readers/
2256          * writers can share a single PW lock. */
2257         rc = mode;
2258         if (mode == LCK_PR)
2259                 rc |= LCK_PW;
2260         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2261                              res_id, type, policy, rc, lockh, unref);
2262         if (rc) {
2263                 if (data != NULL) {
2264                         if (!osc_set_data_with_check(lockh, data)) {
2265                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2266                                         ldlm_lock_decref(lockh, rc);
2267                                 RETURN(0);
2268                         }
2269                 }
2270                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2271                         ldlm_lock_addref(lockh, LCK_PR);
2272                         ldlm_lock_decref(lockh, LCK_PW);
2273                 }
2274                 RETURN(rc);
2275         }
2276         RETURN(rc);
2277 }
2278
2279 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2280 {
2281         ENTRY;
2282
2283         if (unlikely(mode == LCK_GROUP))
2284                 ldlm_lock_decref_and_cancel(lockh, mode);
2285         else
2286                 ldlm_lock_decref(lockh, mode);
2287
2288         RETURN(0);
2289 }
2290
2291 static int osc_statfs_interpret(const struct lu_env *env,
2292                                 struct ptlrpc_request *req,
2293                                 struct osc_async_args *aa, int rc)
2294 {
2295         struct obd_statfs *msfs;
2296         ENTRY;
2297
2298         if (rc == -EBADR)
2299                 /* The request has in fact never been sent
2300                  * due to issues at a higher level (LOV).
2301                  * Exit immediately since the caller is
2302                  * aware of the problem and takes care
2303                  * of the clean up */
2304                  RETURN(rc);
2305
2306         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2307             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2308                 GOTO(out, rc = 0);
2309
2310         if (rc != 0)
2311                 GOTO(out, rc);
2312
2313         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2314         if (msfs == NULL) {
2315                 GOTO(out, rc = -EPROTO);
2316         }
2317
2318         *aa->aa_oi->oi_osfs = *msfs;
2319 out:
2320         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2321         RETURN(rc);
2322 }
2323
2324 static int osc_statfs_async(struct obd_export *exp,
2325                             struct obd_info *oinfo, __u64 max_age,
2326                             struct ptlrpc_request_set *rqset)
2327 {
2328         struct obd_device     *obd = class_exp2obd(exp);
2329         struct ptlrpc_request *req;
2330         struct osc_async_args *aa;
2331         int                    rc;
2332         ENTRY;
2333
2334         /* We could possibly pass max_age in the request (as an absolute
2335          * timestamp or a "seconds.usec ago") so the target can avoid doing
2336          * extra calls into the filesystem if that isn't necessary (e.g.
2337          * during mount that would help a bit).  Having relative timestamps
2338          * is not so great if request processing is slow, while absolute
2339          * timestamps are not ideal because they need time synchronization. */
2340         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2341         if (req == NULL)
2342                 RETURN(-ENOMEM);
2343
2344         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2345         if (rc) {
2346                 ptlrpc_request_free(req);
2347                 RETURN(rc);
2348         }
2349         ptlrpc_request_set_replen(req);
2350         req->rq_request_portal = OST_CREATE_PORTAL;
2351         ptlrpc_at_set_req_timeout(req);
2352
2353         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2354                 /* procfs requests not want stat in wait for avoid deadlock */
2355                 req->rq_no_resend = 1;
2356                 req->rq_no_delay = 1;
2357         }
2358
2359         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2360         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2361         aa = ptlrpc_req_async_args(req);
2362         aa->aa_oi = oinfo;
2363
2364         ptlrpc_set_add_req(rqset, req);
2365         RETURN(0);
2366 }
2367
2368 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2369                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2370 {
2371         struct obd_device     *obd = class_exp2obd(exp);
2372         struct obd_statfs     *msfs;
2373         struct ptlrpc_request *req;
2374         struct obd_import     *imp = NULL;
2375         int rc;
2376         ENTRY;
2377
2378         /*Since the request might also come from lprocfs, so we need
2379          *sync this with client_disconnect_export Bug15684*/
2380         down_read(&obd->u.cli.cl_sem);
2381         if (obd->u.cli.cl_import)
2382                 imp = class_import_get(obd->u.cli.cl_import);
2383         up_read(&obd->u.cli.cl_sem);
2384         if (!imp)
2385                 RETURN(-ENODEV);
2386
2387         /* We could possibly pass max_age in the request (as an absolute
2388          * timestamp or a "seconds.usec ago") so the target can avoid doing
2389          * extra calls into the filesystem if that isn't necessary (e.g.
2390          * during mount that would help a bit).  Having relative timestamps
2391          * is not so great if request processing is slow, while absolute
2392          * timestamps are not ideal because they need time synchronization. */
2393         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2394
2395         class_import_put(imp);
2396
2397         if (req == NULL)
2398                 RETURN(-ENOMEM);
2399
2400         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2401         if (rc) {
2402                 ptlrpc_request_free(req);
2403                 RETURN(rc);
2404         }
2405         ptlrpc_request_set_replen(req);
2406         req->rq_request_portal = OST_CREATE_PORTAL;
2407         ptlrpc_at_set_req_timeout(req);
2408
2409         if (flags & OBD_STATFS_NODELAY) {
2410                 /* procfs requests not want stat in wait for avoid deadlock */
2411                 req->rq_no_resend = 1;
2412                 req->rq_no_delay = 1;
2413         }
2414
2415         rc = ptlrpc_queue_wait(req);
2416         if (rc)
2417                 GOTO(out, rc);
2418
2419         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2420         if (msfs == NULL) {
2421                 GOTO(out, rc = -EPROTO);
2422         }
2423
2424         *osfs = *msfs;
2425
2426         EXIT;
2427  out:
2428         ptlrpc_req_finished(req);
2429         return rc;
2430 }
2431
2432 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2433                          void *karg, void *uarg)
2434 {
2435         struct obd_device *obd = exp->exp_obd;
2436         struct obd_ioctl_data *data = karg;
2437         int err = 0;
2438         ENTRY;
2439
2440         if (!try_module_get(THIS_MODULE)) {
2441                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2442                        module_name(THIS_MODULE));
2443                 return -EINVAL;
2444         }
2445         switch (cmd) {
2446         case OBD_IOC_CLIENT_RECOVER:
2447                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2448                                             data->ioc_inlbuf1, 0);
2449                 if (err > 0)
2450                         err = 0;
2451                 GOTO(out, err);
2452         case IOC_OSC_SET_ACTIVE:
2453                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2454                                                data->ioc_offset);
2455                 GOTO(out, err);
2456         case OBD_IOC_POLL_QUOTACHECK:
2457                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2458                 GOTO(out, err);
2459         case OBD_IOC_PING_TARGET:
2460                 err = ptlrpc_obd_ping(obd);
2461                 GOTO(out, err);
2462         default:
2463                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2464                        cmd, current_comm());
2465                 GOTO(out, err = -ENOTTY);
2466         }
2467 out:
2468         module_put(THIS_MODULE);
2469         return err;
2470 }
2471
2472 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2473                               obd_count keylen, void *key, obd_count vallen,
2474                               void *val, struct ptlrpc_request_set *set)
2475 {
2476         struct ptlrpc_request *req;
2477         struct obd_device     *obd = exp->exp_obd;
2478         struct obd_import     *imp = class_exp2cliimp(exp);
2479         char                  *tmp;
2480         int                    rc;
2481         ENTRY;
2482
2483         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2484
2485         if (KEY_IS(KEY_CHECKSUM)) {
2486                 if (vallen != sizeof(int))
2487                         RETURN(-EINVAL);
2488                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2489                 RETURN(0);
2490         }
2491
2492         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2493                 sptlrpc_conf_client_adapt(obd);
2494                 RETURN(0);
2495         }
2496
2497         if (KEY_IS(KEY_FLUSH_CTX)) {
2498                 sptlrpc_import_flush_my_ctx(imp);
2499                 RETURN(0);
2500         }
2501
2502         if (KEY_IS(KEY_CACHE_SET)) {
2503                 struct client_obd *cli = &obd->u.cli;
2504
2505                 LASSERT(cli->cl_cache == NULL); /* only once */
2506                 cli->cl_cache = (struct cl_client_cache *)val;
2507                 cl_cache_incref(cli->cl_cache);
2508                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2509
2510                 /* add this osc into entity list */
2511                 LASSERT(list_empty(&cli->cl_lru_osc));
2512                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2513                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2514                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2515
2516                 RETURN(0);
2517         }
2518
2519         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2520                 struct client_obd *cli = &obd->u.cli;
2521                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2522                 long target = *(long *)val;
2523
2524                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2525                 *(long *)val -= nr;
2526                 RETURN(0);
2527         }
2528
2529         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2530                 RETURN(-EINVAL);
2531
2532         /* We pass all other commands directly to OST. Since nobody calls osc
2533            methods directly and everybody is supposed to go through LOV, we
2534            assume lov checked invalid values for us.
2535            The only recognised values so far are evict_by_nid and mds_conn.
2536            Even if something bad goes through, we'd get a -EINVAL from OST
2537            anyway. */
2538
2539         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2540                                                 &RQF_OST_SET_GRANT_INFO :
2541                                                 &RQF_OBD_SET_INFO);
2542         if (req == NULL)
2543                 RETURN(-ENOMEM);
2544
2545         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2546                              RCL_CLIENT, keylen);
2547         if (!KEY_IS(KEY_GRANT_SHRINK))
2548                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2549                                      RCL_CLIENT, vallen);
2550         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2551         if (rc) {
2552                 ptlrpc_request_free(req);
2553                 RETURN(rc);
2554         }
2555
2556         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2557         memcpy(tmp, key, keylen);
2558         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2559                                                         &RMF_OST_BODY :
2560                                                         &RMF_SETINFO_VAL);
2561         memcpy(tmp, val, vallen);
2562
2563         if (KEY_IS(KEY_GRANT_SHRINK)) {
2564                 struct osc_grant_args *aa;
2565                 struct obdo *oa;
2566
2567                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2568                 aa = ptlrpc_req_async_args(req);
2569                 OBDO_ALLOC(oa);
2570                 if (!oa) {
2571                         ptlrpc_req_finished(req);
2572                         RETURN(-ENOMEM);
2573                 }
2574                 *oa = ((struct ost_body *)val)->oa;
2575                 aa->aa_oa = oa;
2576                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2577         }
2578
2579         ptlrpc_request_set_replen(req);
2580         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2581                 LASSERT(set != NULL);
2582                 ptlrpc_set_add_req(set, req);
2583                 ptlrpc_check_set(NULL, set);
2584         } else
2585                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2586
2587         RETURN(0);
2588 }
2589
2590 static int osc_reconnect(const struct lu_env *env,
2591                          struct obd_export *exp, struct obd_device *obd,
2592                          struct obd_uuid *cluuid,
2593                          struct obd_connect_data *data,
2594                          void *localdata)
2595 {
2596         struct client_obd *cli = &obd->u.cli;
2597
2598         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2599                 long lost_grant;
2600
2601                 spin_lock(&cli->cl_loi_list_lock);
2602                 data->ocd_grant = (cli->cl_avail_grant +
2603                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2604                                   2 * cli_brw_size(obd);
2605                 lost_grant = cli->cl_lost_grant;
2606                 cli->cl_lost_grant = 0;
2607                 spin_unlock(&cli->cl_loi_list_lock);
2608
2609                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2610                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2611                        data->ocd_version, data->ocd_grant, lost_grant);
2612         }
2613
2614         RETURN(0);
2615 }
2616
2617 static int osc_disconnect(struct obd_export *exp)
2618 {
2619         struct obd_device *obd = class_exp2obd(exp);
2620         int rc;
2621
2622         rc = client_disconnect_export(exp);
2623         /**
2624          * Initially we put del_shrink_grant before disconnect_export, but it
2625          * causes the following problem if setup (connect) and cleanup
2626          * (disconnect) are tangled together.
2627          *      connect p1                     disconnect p2
2628          *   ptlrpc_connect_import
2629          *     ...............               class_manual_cleanup
2630          *                                     osc_disconnect
2631          *                                     del_shrink_grant
2632          *   ptlrpc_connect_interrupt
2633          *     init_grant_shrink
2634          *   add this client to shrink list
2635          *                                      cleanup_osc
2636          * Bang! pinger trigger the shrink.
2637          * So the osc should be disconnected from the shrink list, after we
2638          * are sure the import has been destroyed. BUG18662
2639          */
2640         if (obd->u.cli.cl_import == NULL)
2641                 osc_del_shrink_grant(&obd->u.cli);
2642         return rc;
2643 }
2644
2645 static int osc_import_event(struct obd_device *obd,
2646                             struct obd_import *imp,
2647                             enum obd_import_event event)
2648 {
2649         struct client_obd *cli;
2650         int rc = 0;
2651
2652         ENTRY;
2653         LASSERT(imp->imp_obd == obd);
2654
2655         switch (event) {
2656         case IMP_EVENT_DISCON: {
2657                 cli = &obd->u.cli;
2658                 spin_lock(&cli->cl_loi_list_lock);
2659                 cli->cl_avail_grant = 0;
2660                 cli->cl_lost_grant = 0;
2661                 spin_unlock(&cli->cl_loi_list_lock);
2662                 break;
2663         }
2664         case IMP_EVENT_INACTIVE: {
2665                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2666                 break;
2667         }
2668         case IMP_EVENT_INVALIDATE: {
2669                 struct ldlm_namespace *ns = obd->obd_namespace;
2670                 struct lu_env         *env;
2671                 int                    refcheck;
2672
2673                 env = cl_env_get(&refcheck);
2674                 if (!IS_ERR(env)) {
2675                         /* Reset grants */
2676                         cli = &obd->u.cli;
2677                         /* all pages go to failing rpcs due to the invalid
2678                          * import */
2679                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2680
2681                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2682                         cl_env_put(env, &refcheck);
2683                 } else
2684                         rc = PTR_ERR(env);
2685                 break;
2686         }
2687         case IMP_EVENT_ACTIVE: {
2688                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2689                 break;
2690         }
2691         case IMP_EVENT_OCD: {
2692                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2693
2694                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2695                         osc_init_grant(&obd->u.cli, ocd);
2696
2697                 /* See bug 7198 */
2698                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2699                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2700
2701                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2702                 break;
2703         }
2704         case IMP_EVENT_DEACTIVATE: {
2705                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2706                 break;
2707         }
2708         case IMP_EVENT_ACTIVATE: {
2709                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2710                 break;
2711         }
2712         default:
2713                 CERROR("Unknown import event %d\n", event);
2714                 LBUG();
2715         }
2716         RETURN(rc);
2717 }
2718
2719 /**
2720  * Determine whether the lock can be canceled before replaying the lock
2721  * during recovery, see bug16774 for detailed information.
2722  *
2723  * \retval zero the lock can't be canceled
2724  * \retval other ok to cancel
2725  */
2726 static int osc_cancel_weight(struct ldlm_lock *lock)
2727 {
2728         /*
2729          * Cancel all unused and granted extent lock.
2730          */
2731         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2732             lock->l_granted_mode == lock->l_req_mode &&
2733             osc_ldlm_weigh_ast(lock) == 0)
2734                 RETURN(1);
2735
2736         RETURN(0);
2737 }
2738
2739 static int brw_queue_work(const struct lu_env *env, void *data)
2740 {
2741         struct client_obd *cli = data;
2742
2743         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2744
2745         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2746         RETURN(0);
2747 }
2748
2749 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2750 {
2751         struct client_obd *cli = &obd->u.cli;
2752         struct obd_type   *type;
2753         void              *handler;
2754         int                rc;
2755         ENTRY;
2756
2757         rc = ptlrpcd_addref();
2758         if (rc)
2759                 RETURN(rc);
2760
2761         rc = client_obd_setup(obd, lcfg);
2762         if (rc)
2763                 GOTO(out_ptlrpcd, rc);
2764
2765         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2766         if (IS_ERR(handler))
2767                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2768         cli->cl_writeback_work = handler;
2769
2770         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2771         if (IS_ERR(handler))
2772                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2773         cli->cl_lru_work = handler;
2774
2775         rc = osc_quota_setup(obd);
2776         if (rc)
2777                 GOTO(out_ptlrpcd_work, rc);
2778
2779         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2780
2781 #ifdef CONFIG_PROC_FS
2782         obd->obd_vars = lprocfs_osc_obd_vars;
2783 #endif
2784         /* If this is true then both client (osc) and server (osp) are on the
2785          * same node. The osp layer if loaded first will register the osc proc
2786          * directory. In that case this obd_device will be attached its proc
2787          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2788         type = class_search_type(LUSTRE_OSP_NAME);
2789         if (type && type->typ_procsym) {
2790                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2791                                                        type->typ_procsym,
2792                                                        obd->obd_vars, obd);
2793                 if (IS_ERR(obd->obd_proc_entry)) {
2794                         rc = PTR_ERR(obd->obd_proc_entry);
2795                         CERROR("error %d setting up lprocfs for %s\n", rc,
2796                                obd->obd_name);
2797                         obd->obd_proc_entry = NULL;
2798                 }
2799         } else {
2800                 rc = lprocfs_obd_setup(obd);
2801         }
2802
2803         /* If the basic OSC proc tree construction succeeded then
2804          * lets do the rest. */
2805         if (rc == 0) {
2806                 lproc_osc_attach_seqstat(obd);
2807                 sptlrpc_lprocfs_cliobd_attach(obd);
2808                 ptlrpc_lprocfs_register_obd(obd);
2809         }
2810
2811         /* We need to allocate a few requests more, because
2812          * brw_interpret tries to create new requests before freeing
2813          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2814          * reserved, but I'm afraid that might be too much wasted RAM
2815          * in fact, so 2 is just my guess and still should work. */
2816         cli->cl_import->imp_rq_pool =
2817                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2818                                     OST_MAXREQSIZE,
2819                                     ptlrpc_add_rqs_to_pool);
2820
2821         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2822         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2823         RETURN(0);
2824
2825 out_ptlrpcd_work:
2826         if (cli->cl_writeback_work != NULL) {
2827                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2828                 cli->cl_writeback_work = NULL;
2829         }
2830         if (cli->cl_lru_work != NULL) {
2831                 ptlrpcd_destroy_work(cli->cl_lru_work);
2832                 cli->cl_lru_work = NULL;
2833         }
2834 out_client_setup:
2835         client_obd_cleanup(obd);
2836 out_ptlrpcd:
2837         ptlrpcd_decref();
2838         RETURN(rc);
2839 }
2840
2841 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2842 {
2843         int rc = 0;
2844         ENTRY;
2845
2846         switch (stage) {
2847         case OBD_CLEANUP_EARLY: {
2848                 struct obd_import *imp;
2849                 imp = obd->u.cli.cl_import;
2850                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2851                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2852                 ptlrpc_deactivate_import(imp);
2853                 spin_lock(&imp->imp_lock);
2854                 imp->imp_pingable = 0;
2855                 spin_unlock(&imp->imp_lock);
2856                 break;
2857         }
2858         case OBD_CLEANUP_EXPORTS: {
2859                 struct client_obd *cli = &obd->u.cli;
2860                 /* LU-464
2861                  * for echo client, export may be on zombie list, wait for
2862                  * zombie thread to cull it, because cli.cl_import will be
2863                  * cleared in client_disconnect_export():
2864                  *   class_export_destroy() -> obd_cleanup() ->
2865                  *   echo_device_free() -> echo_client_cleanup() ->
2866                  *   obd_disconnect() -> osc_disconnect() ->
2867                  *   client_disconnect_export()
2868                  */
2869                 obd_zombie_barrier();
2870                 if (cli->cl_writeback_work) {
2871                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2872                         cli->cl_writeback_work = NULL;
2873                 }
2874                 if (cli->cl_lru_work) {
2875                         ptlrpcd_destroy_work(cli->cl_lru_work);
2876                         cli->cl_lru_work = NULL;
2877                 }
2878                 obd_cleanup_client_import(obd);
2879                 ptlrpc_lprocfs_unregister_obd(obd);
2880                 lprocfs_obd_cleanup(obd);
2881                 break;
2882                 }
2883         }
2884         RETURN(rc);
2885 }
2886
2887 int osc_cleanup(struct obd_device *obd)
2888 {
2889         struct client_obd *cli = &obd->u.cli;
2890         int rc;
2891
2892         ENTRY;
2893
2894         /* lru cleanup */
2895         if (cli->cl_cache != NULL) {
2896                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2897                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2898                 list_del_init(&cli->cl_lru_osc);
2899                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2900                 cli->cl_lru_left = NULL;
2901                 cl_cache_decref(cli->cl_cache);
2902                 cli->cl_cache = NULL;
2903         }
2904
2905         /* free memory of osc quota cache */
2906         osc_quota_cleanup(obd);
2907
2908         rc = client_obd_cleanup(obd);
2909
2910         ptlrpcd_decref();
2911         RETURN(rc);
2912 }
2913
2914 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2915 {
2916         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2917         return rc > 0 ? 0: rc;
2918 }
2919
2920 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2921 {
2922         return osc_process_config_base(obd, buf);
2923 }
2924
2925 static struct obd_ops osc_obd_ops = {
2926         .o_owner                = THIS_MODULE,
2927         .o_setup                = osc_setup,
2928         .o_precleanup           = osc_precleanup,
2929         .o_cleanup              = osc_cleanup,
2930         .o_add_conn             = client_import_add_conn,
2931         .o_del_conn             = client_import_del_conn,
2932         .o_connect              = client_connect_import,
2933         .o_reconnect            = osc_reconnect,
2934         .o_disconnect           = osc_disconnect,
2935         .o_statfs               = osc_statfs,
2936         .o_statfs_async         = osc_statfs_async,
2937         .o_create               = osc_create,
2938         .o_destroy              = osc_destroy,
2939         .o_getattr              = osc_getattr,
2940         .o_setattr              = osc_setattr,
2941         .o_setattr_async        = osc_setattr_async,
2942         .o_iocontrol            = osc_iocontrol,
2943         .o_set_info_async       = osc_set_info_async,
2944         .o_import_event         = osc_import_event,
2945         .o_process_config       = osc_process_config,
2946         .o_quotactl             = osc_quotactl,
2947         .o_quotacheck           = osc_quotacheck,
2948 };
2949
2950 static int __init osc_init(void)
2951 {
2952         bool enable_proc = true;
2953         struct obd_type *type;
2954         int rc;
2955         ENTRY;
2956
2957         /* print an address of _any_ initialized kernel symbol from this
2958          * module, to allow debugging with gdb that doesn't support data
2959          * symbols from modules.*/
2960         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2961
2962         rc = lu_kmem_init(osc_caches);
2963         if (rc)
2964                 RETURN(rc);
2965
2966         type = class_search_type(LUSTRE_OSP_NAME);
2967         if (type != NULL && type->typ_procsym != NULL)
2968                 enable_proc = false;
2969
2970         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2971                                  LUSTRE_OSC_NAME, &osc_device_type);
2972         if (rc) {
2973                 lu_kmem_fini(osc_caches);
2974                 RETURN(rc);
2975         }
2976
2977         RETURN(rc);
2978 }
2979
2980 static void /*__exit*/ osc_exit(void)
2981 {
2982         class_unregister_type(LUSTRE_OSC_NAME);
2983         lu_kmem_fini(osc_caches);
2984 }
2985
2986 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2987 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2988 MODULE_LICENSE("GPL");
2989
2990 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);