Whamcloud - gitweb
b64ad9ad017d22a3dc8b752f5104e073bd3c5f4e
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_async_args {
72         struct obd_info *aa_oi;
73 };
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct obd_info *fa_oi;
83         obd_enqueue_update_f     fa_upcall;
84         void                    *fa_cookie;
85 };
86
87 struct osc_enqueue_args {
88         struct obd_export       *oa_exp;
89         ldlm_type_t             oa_type;
90         ldlm_mode_t             oa_mode;
91         __u64                   *oa_flags;
92         osc_enqueue_upcall_f    oa_upcall;
93         void                    *oa_cookie;
94         struct ost_lvb          *oa_lvb;
95         struct lustre_handle    oa_lockh;
96         unsigned int            oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 static inline void osc_pack_capa(struct ptlrpc_request *req,
104                                  struct ost_body *body, void *capa)
105 {
106         struct obd_capa *oc = (struct obd_capa *)capa;
107         struct lustre_capa *c;
108
109         if (!capa)
110                 return;
111
112         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
113         LASSERT(c);
114         capa_cpy(c, oc);
115         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
116         DEBUG_CAPA(D_SEC, c, "pack");
117 }
118
119 static inline void osc_pack_req_body(struct ptlrpc_request *req,
120                                      struct obd_info *oinfo)
121 {
122         struct ost_body *body;
123
124         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
125         LASSERT(body);
126
127         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
128                              oinfo->oi_oa);
129         osc_pack_capa(req, body, oinfo->oi_capa);
130 }
131
132 static inline void osc_set_capa_size(struct ptlrpc_request *req,
133                                      const struct req_msg_field *field,
134                                      struct obd_capa *oc)
135 {
136         if (oc == NULL)
137                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
138         else
139                 /* it is already calculated as sizeof struct obd_capa */
140                 ;
141 }
142
143 static int osc_getattr_interpret(const struct lu_env *env,
144                                  struct ptlrpc_request *req,
145                                  struct osc_async_args *aa, int rc)
146 {
147         struct ost_body *body;
148         ENTRY;
149
150         if (rc != 0)
151                 GOTO(out, rc);
152
153         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154         if (body) {
155                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
156                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
157                                      aa->aa_oi->oi_oa, &body->oa);
158
159                 /* This should really be sent by the OST */
160                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
161                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
162         } else {
163                 CDEBUG(D_INFO, "can't unpack ost_body\n");
164                 rc = -EPROTO;
165                 aa->aa_oi->oi_oa->o_valid = 0;
166         }
167 out:
168         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
169         RETURN(rc);
170 }
171
172 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
173                              struct ptlrpc_request_set *set)
174 {
175         struct ptlrpc_request *req;
176         struct osc_async_args *aa;
177         int                    rc;
178         ENTRY;
179
180         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
181         if (req == NULL)
182                 RETURN(-ENOMEM);
183
184         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
185         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
186         if (rc) {
187                 ptlrpc_request_free(req);
188                 RETURN(rc);
189         }
190
191         osc_pack_req_body(req, oinfo);
192
193         ptlrpc_request_set_replen(req);
194         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
195
196         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
197         aa = ptlrpc_req_async_args(req);
198         aa->aa_oi = oinfo;
199
200         ptlrpc_set_add_req(set, req);
201         RETURN(0);
202 }
203
204 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
205                        struct obd_info *oinfo)
206 {
207         struct ptlrpc_request *req;
208         struct ost_body       *body;
209         int                    rc;
210         ENTRY;
211
212         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
213         if (req == NULL)
214                 RETURN(-ENOMEM);
215
216         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oinfo);
224
225         ptlrpc_request_set_replen(req);
226
227         rc = ptlrpc_queue_wait(req);
228         if (rc)
229                 GOTO(out, rc);
230
231         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
232         if (body == NULL)
233                 GOTO(out, rc = -EPROTO);
234
235         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
237                              &body->oa);
238
239         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
240         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
241
242         EXIT;
243  out:
244         ptlrpc_req_finished(req);
245         return rc;
246 }
247
248 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
249                        struct obd_info *oinfo, struct obd_trans_info *oti)
250 {
251         struct ptlrpc_request *req;
252         struct ost_body       *body;
253         int                    rc;
254         ENTRY;
255
256         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
257
258         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
259         if (req == NULL)
260                 RETURN(-ENOMEM);
261
262         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
263         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
264         if (rc) {
265                 ptlrpc_request_free(req);
266                 RETURN(rc);
267         }
268
269         osc_pack_req_body(req, oinfo);
270
271         ptlrpc_request_set_replen(req);
272
273         rc = ptlrpc_queue_wait(req);
274         if (rc)
275                 GOTO(out, rc);
276
277         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
278         if (body == NULL)
279                 GOTO(out, rc = -EPROTO);
280
281         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
282                              &body->oa);
283
284         EXIT;
285 out:
286         ptlrpc_req_finished(req);
287         RETURN(rc);
288 }
289
290 static int osc_setattr_interpret(const struct lu_env *env,
291                                  struct ptlrpc_request *req,
292                                  struct osc_setattr_args *sa, int rc)
293 {
294         struct ost_body *body;
295         ENTRY;
296
297         if (rc != 0)
298                 GOTO(out, rc);
299
300         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
301         if (body == NULL)
302                 GOTO(out, rc = -EPROTO);
303
304         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
305                              &body->oa);
306 out:
307         rc = sa->sa_upcall(sa->sa_cookie, rc);
308         RETURN(rc);
309 }
310
311 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
312                            struct obd_trans_info *oti,
313                            obd_enqueue_update_f upcall, void *cookie,
314                            struct ptlrpc_request_set *rqset)
315 {
316         struct ptlrpc_request   *req;
317         struct osc_setattr_args *sa;
318         int                      rc;
319         ENTRY;
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331
332         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
333                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
334
335         osc_pack_req_body(req, oinfo);
336
337         ptlrpc_request_set_replen(req);
338
339         /* do mds to ost setattr asynchronously */
340         if (!rqset) {
341                 /* Do not wait for response. */
342                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
343         } else {
344                 req->rq_interpret_reply =
345                         (ptlrpc_interpterer_t)osc_setattr_interpret;
346
347                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
348                 sa = ptlrpc_req_async_args(req);
349                 sa->sa_oa = oinfo->oi_oa;
350                 sa->sa_upcall = upcall;
351                 sa->sa_cookie = cookie;
352
353                 if (rqset == PTLRPCD_SET)
354                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
355                 else
356                         ptlrpc_set_add_req(rqset, req);
357         }
358
359         RETURN(0);
360 }
361
362 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
363                              struct obd_trans_info *oti,
364                              struct ptlrpc_request_set *rqset)
365 {
366         return osc_setattr_async_base(exp, oinfo, oti,
367                                       oinfo->oi_cb_up, oinfo, rqset);
368 }
369
370 static int osc_create(const struct lu_env *env, struct obd_export *exp,
371                       struct obdo *oa, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body       *body;
375         int                    rc;
376         ENTRY;
377
378         LASSERT(oa != NULL);
379         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
380         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
383         if (req == NULL)
384                 GOTO(out, rc = -ENOMEM);
385
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 GOTO(out, rc);
390         }
391
392         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
393         LASSERT(body);
394
395         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
396
397         ptlrpc_request_set_replen(req);
398
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
412         if (body == NULL)
413                 GOTO(out_req, rc = -EPROTO);
414
415         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
416         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
417
418         oa->o_blksize = cli_brw_size(exp->exp_obd);
419         oa->o_valid |= OBD_MD_FLBLKSZ;
420
421         if (oti != NULL) {
422                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
423                         if (oti->oti_logcookies == NULL)
424                                 oti->oti_logcookies = &oti->oti_onecookie;
425
426                         *oti->oti_logcookies = oa->o_lcookie;
427                 }
428         }
429
430         CDEBUG(D_HA, "transno: "LPD64"\n",
431                lustre_msg_get_transno(req->rq_repmsg));
432 out_req:
433         ptlrpc_req_finished(req);
434 out:
435         RETURN(rc);
436 }
437
438 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
439                    obd_enqueue_update_f upcall, void *cookie,
440                    struct ptlrpc_request_set *rqset)
441 {
442         struct ptlrpc_request   *req;
443         struct osc_setattr_args *sa;
444         struct ost_body         *body;
445         int                      rc;
446         ENTRY;
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
449         if (req == NULL)
450                 RETURN(-ENOMEM);
451
452         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 RETURN(rc);
457         }
458         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
459         ptlrpc_at_set_req_timeout(req);
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
464                              oinfo->oi_oa);
465         osc_pack_capa(req, body, oinfo->oi_capa);
466
467         ptlrpc_request_set_replen(req);
468
469         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
470         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
471         sa = ptlrpc_req_async_args(req);
472         sa->sa_oa     = oinfo->oi_oa;
473         sa->sa_upcall = upcall;
474         sa->sa_cookie = cookie;
475         if (rqset == PTLRPCD_SET)
476                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
477         else
478                 ptlrpc_set_add_req(rqset, req);
479
480         RETURN(0);
481 }
482
483 static int osc_sync_interpret(const struct lu_env *env,
484                               struct ptlrpc_request *req,
485                               void *arg, int rc)
486 {
487         struct osc_fsync_args *fa = arg;
488         struct ost_body *body;
489         ENTRY;
490
491         if (rc)
492                 GOTO(out, rc);
493
494         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
495         if (body == NULL) {
496                 CERROR ("can't unpack ost_body\n");
497                 GOTO(out, rc = -EPROTO);
498         }
499
500         *fa->fa_oi->oi_oa = body->oa;
501 out:
502         rc = fa->fa_upcall(fa->fa_cookie, rc);
503         RETURN(rc);
504 }
505
506 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
507                   obd_enqueue_update_f upcall, void *cookie,
508                   struct ptlrpc_request_set *rqset)
509 {
510         struct ptlrpc_request *req;
511         struct ost_body       *body;
512         struct osc_fsync_args *fa;
513         int                    rc;
514         ENTRY;
515
516         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
517         if (req == NULL)
518                 RETURN(-ENOMEM);
519
520         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
521         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
522         if (rc) {
523                 ptlrpc_request_free(req);
524                 RETURN(rc);
525         }
526
527         /* overload the size and blocks fields in the oa with start/end */
528         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
529         LASSERT(body);
530         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
531                              oinfo->oi_oa);
532         osc_pack_capa(req, body, oinfo->oi_capa);
533
534         ptlrpc_request_set_replen(req);
535         req->rq_interpret_reply = osc_sync_interpret;
536
537         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
538         fa = ptlrpc_req_async_args(req);
539         fa->fa_oi = oinfo;
540         fa->fa_upcall = upcall;
541         fa->fa_cookie = cookie;
542
543         if (rqset == PTLRPCD_SET)
544                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
545         else
546                 ptlrpc_set_add_req(rqset, req);
547
548         RETURN (0);
549 }
550
551 /* Find and cancel locally locks matched by @mode in the resource found by
552  * @objid. Found locks are added into @cancel list. Returns the amount of
553  * locks added to @cancels list. */
554 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
555                                    struct list_head *cancels,
556                                    ldlm_mode_t mode, __u64 lock_flags)
557 {
558         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
559         struct ldlm_res_id res_id;
560         struct ldlm_resource *res;
561         int count;
562         ENTRY;
563
564         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
565          * export) but disabled through procfs (flag in NS).
566          *
567          * This distinguishes from a case when ELC is not supported originally,
568          * when we still want to cancel locks in advance and just cancel them
569          * locally, without sending any RPC. */
570         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
571                 RETURN(0);
572
573         ostid_build_res_name(&oa->o_oi, &res_id);
574         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
575         if (IS_ERR(res))
576                 RETURN(0);
577
578         LDLM_RESOURCE_ADDREF(res);
579         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
580                                            lock_flags, 0, NULL);
581         LDLM_RESOURCE_DELREF(res);
582         ldlm_resource_putref(res);
583         RETURN(count);
584 }
585
586 static int osc_destroy_interpret(const struct lu_env *env,
587                                  struct ptlrpc_request *req, void *data,
588                                  int rc)
589 {
590         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
591
592         atomic_dec(&cli->cl_destroy_in_flight);
593         wake_up(&cli->cl_destroy_waitq);
594         return 0;
595 }
596
597 static int osc_can_send_destroy(struct client_obd *cli)
598 {
599         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
600             cli->cl_max_rpcs_in_flight) {
601                 /* The destroy request can be sent */
602                 return 1;
603         }
604         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
605             cli->cl_max_rpcs_in_flight) {
606                 /*
607                  * The counter has been modified between the two atomic
608                  * operations.
609                  */
610                 wake_up(&cli->cl_destroy_waitq);
611         }
612         return 0;
613 }
614
615 /* Destroy requests can be async always on the client, and we don't even really
616  * care about the return code since the client cannot do anything at all about
617  * a destroy failure.
618  * When the MDS is unlinking a filename, it saves the file objects into a
619  * recovery llog, and these object records are cancelled when the OST reports
620  * they were destroyed and sync'd to disk (i.e. transaction committed).
621  * If the client dies, or the OST is down when the object should be destroyed,
622  * the records are not cancelled, and when the OST reconnects to the MDS next,
623  * it will retrieve the llog unlink logs and then sends the log cancellation
624  * cookies to the MDS after committing destroy transactions. */
625 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
626                        struct obdo *oa, struct obd_trans_info *oti)
627 {
628         struct client_obd     *cli = &exp->exp_obd->u.cli;
629         struct ptlrpc_request *req;
630         struct ost_body       *body;
631         struct list_head       cancels = LIST_HEAD_INIT(cancels);
632         int rc, count;
633         ENTRY;
634
635         if (!oa) {
636                 CDEBUG(D_INFO, "oa NULL\n");
637                 RETURN(-EINVAL);
638         }
639
640         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
641                                         LDLM_FL_DISCARD_DATA);
642
643         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
644         if (req == NULL) {
645                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
646                 RETURN(-ENOMEM);
647         }
648
649         osc_set_capa_size(req, &RMF_CAPA1, NULL);
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
661                 oa->o_lcookie = *oti->oti_logcookies;
662         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
663         LASSERT(body);
664         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
665
666         ptlrpc_request_set_replen(req);
667
668         /* If osc_destory is for destroying the unlink orphan,
669          * sent from MDT to OST, which should not be blocked here,
670          * because the process might be triggered by ptlrpcd, and
671          * it is not good to block ptlrpcd thread (b=16006)*/
672         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
673                 req->rq_interpret_reply = osc_destroy_interpret;
674                 if (!osc_can_send_destroy(cli)) {
675                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
676                                                           NULL);
677
678                         /*
679                          * Wait until the number of on-going destroy RPCs drops
680                          * under max_rpc_in_flight
681                          */
682                         l_wait_event_exclusive(cli->cl_destroy_waitq,
683                                                osc_can_send_destroy(cli), &lwi);
684                 }
685         }
686
687         /* Do not wait for response */
688         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
689         RETURN(0);
690 }
691
692 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
693                                 long writing_bytes)
694 {
695         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
696
697         LASSERT(!(oa->o_valid & bits));
698
699         oa->o_valid |= bits;
700         spin_lock(&cli->cl_loi_list_lock);
701         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
702         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
703                      cli->cl_dirty_max_pages)) {
704                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
705                        cli->cl_dirty_pages, cli->cl_dirty_transit,
706                        cli->cl_dirty_max_pages);
707                 oa->o_undirty = 0;
708         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
709                             atomic_long_read(&obd_dirty_transit_pages) >
710                             (obd_max_dirty_pages + 1))) {
711                 /* The atomic_read() allowing the atomic_inc() are
712                  * not covered by a lock thus they may safely race and trip
713                  * this CERROR() unless we add in a small fudge factor (+1). */
714                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
715                        cli->cl_import->imp_obd->obd_name,
716                        atomic_long_read(&obd_dirty_pages),
717                        atomic_long_read(&obd_dirty_transit_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
727                                       PAGE_CACHE_SHIFT) *
728                                      (cli->cl_max_rpcs_in_flight + 1);
729                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
730                                     max_in_flight);
731         }
732         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
733         oa->o_dropped = cli->cl_lost_grant;
734         cli->cl_lost_grant = 0;
735         spin_unlock(&cli->cl_loi_list_lock);
736         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
737                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
738
739 }
740
741 void osc_update_next_shrink(struct client_obd *cli)
742 {
743         cli->cl_next_shrink_grant =
744                 cfs_time_shift(cli->cl_grant_shrink_interval);
745         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
746                cli->cl_next_shrink_grant);
747 }
748
749 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
750 {
751         spin_lock(&cli->cl_loi_list_lock);
752         cli->cl_avail_grant += grant;
753         spin_unlock(&cli->cl_loi_list_lock);
754 }
755
756 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
757 {
758         if (body->oa.o_valid & OBD_MD_FLGRANT) {
759                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
760                 __osc_update_grant(cli, body->oa.o_grant);
761         }
762 }
763
764 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
765                               obd_count keylen, void *key, obd_count vallen,
766                               void *val, struct ptlrpc_request_set *set);
767
768 static int osc_shrink_grant_interpret(const struct lu_env *env,
769                                       struct ptlrpc_request *req,
770                                       void *aa, int rc)
771 {
772         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
773         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
774         struct ost_body *body;
775
776         if (rc != 0) {
777                 __osc_update_grant(cli, oa->o_grant);
778                 GOTO(out, rc);
779         }
780
781         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
782         LASSERT(body);
783         osc_update_grant(cli, body);
784 out:
785         OBDO_FREE(oa);
786         return rc;
787 }
788
789 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
790 {
791         spin_lock(&cli->cl_loi_list_lock);
792         oa->o_grant = cli->cl_avail_grant / 4;
793         cli->cl_avail_grant -= oa->o_grant;
794         spin_unlock(&cli->cl_loi_list_lock);
795         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
796                 oa->o_valid |= OBD_MD_FLFLAGS;
797                 oa->o_flags = 0;
798         }
799         oa->o_flags |= OBD_FL_SHRINK_GRANT;
800         osc_update_next_shrink(cli);
801 }
802
803 /* Shrink the current grant, either from some large amount to enough for a
804  * full set of in-flight RPCs, or if we have already shrunk to that limit
805  * then to enough for a single RPC.  This avoids keeping more grant than
806  * needed, and avoids shrinking the grant piecemeal. */
807 static int osc_shrink_grant(struct client_obd *cli)
808 {
809         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
810                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
811
812         spin_lock(&cli->cl_loi_list_lock);
813         if (cli->cl_avail_grant <= target_bytes)
814                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
815         spin_unlock(&cli->cl_loi_list_lock);
816
817         return osc_shrink_grant_to_target(cli, target_bytes);
818 }
819
820 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
821 {
822         int                     rc = 0;
823         struct ost_body        *body;
824         ENTRY;
825
826         spin_lock(&cli->cl_loi_list_lock);
827         /* Don't shrink if we are already above or below the desired limit
828          * We don't want to shrink below a single RPC, as that will negatively
829          * impact block allocation and long-term performance. */
830         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
831                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
832
833         if (target_bytes >= cli->cl_avail_grant) {
834                 spin_unlock(&cli->cl_loi_list_lock);
835                 RETURN(0);
836         }
837         spin_unlock(&cli->cl_loi_list_lock);
838
839         OBD_ALLOC_PTR(body);
840         if (!body)
841                 RETURN(-ENOMEM);
842
843         osc_announce_cached(cli, &body->oa, 0);
844
845         spin_lock(&cli->cl_loi_list_lock);
846         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
847         cli->cl_avail_grant = target_bytes;
848         spin_unlock(&cli->cl_loi_list_lock);
849         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
850                 body->oa.o_valid |= OBD_MD_FLFLAGS;
851                 body->oa.o_flags = 0;
852         }
853         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
854         osc_update_next_shrink(cli);
855
856         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
857                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
858                                 sizeof(*body), body, NULL);
859         if (rc != 0)
860                 __osc_update_grant(cli, body->oa.o_grant);
861         OBD_FREE_PTR(body);
862         RETURN(rc);
863 }
864
865 static int osc_should_shrink_grant(struct client_obd *client)
866 {
867         cfs_time_t time = cfs_time_current();
868         cfs_time_t next_shrink = client->cl_next_shrink_grant;
869
870         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
871              OBD_CONNECT_GRANT_SHRINK) == 0)
872                 return 0;
873
874         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
875                 /* Get the current RPC size directly, instead of going via:
876                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
877                  * Keep comment here so that it can be found by searching. */
878                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
879
880                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
881                     client->cl_avail_grant > brw_size)
882                         return 1;
883                 else
884                         osc_update_next_shrink(client);
885         }
886         return 0;
887 }
888
889 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
890 {
891         struct client_obd *client;
892
893         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
894                 if (osc_should_shrink_grant(client))
895                         osc_shrink_grant(client);
896         }
897         return 0;
898 }
899
900 static int osc_add_shrink_grant(struct client_obd *client)
901 {
902         int rc;
903
904         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
905                                        TIMEOUT_GRANT,
906                                        osc_grant_shrink_grant_cb, NULL,
907                                        &client->cl_grant_shrink_list);
908         if (rc) {
909                 CERROR("add grant client %s error %d\n",
910                         client->cl_import->imp_obd->obd_name, rc);
911                 return rc;
912         }
913         CDEBUG(D_CACHE, "add grant client %s \n",
914                client->cl_import->imp_obd->obd_name);
915         osc_update_next_shrink(client);
916         return 0;
917 }
918
919 static int osc_del_shrink_grant(struct client_obd *client)
920 {
921         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
922                                          TIMEOUT_GRANT);
923 }
924
925 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
926 {
927         /*
928          * ocd_grant is the total grant amount we're expect to hold: if we've
929          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
930          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
931          * dirty.
932          *
933          * race is tolerable here: if we're evicted, but imp_state already
934          * left EVICTED state, then cl_dirty_pages must be 0 already.
935          */
936         spin_lock(&cli->cl_loi_list_lock);
937         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
938                 cli->cl_avail_grant = ocd->ocd_grant;
939         else
940                 cli->cl_avail_grant = ocd->ocd_grant -
941                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
942
943         if (cli->cl_avail_grant < 0) {
944                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
945                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
946                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
947                 /* workaround for servers which do not have the patch from
948                  * LU-2679 */
949                 cli->cl_avail_grant = ocd->ocd_grant;
950         }
951
952         /* determine the appropriate chunk size used by osc_extent. */
953         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
954         spin_unlock(&cli->cl_loi_list_lock);
955
956         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
957                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
958                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
959
960         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
961             list_empty(&cli->cl_grant_shrink_list))
962                 osc_add_shrink_grant(cli);
963 }
964
965 /* We assume that the reason this OSC got a short read is because it read
966  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
967  * via the LOV, and it _knows_ it's reading inside the file, it's just that
968  * this stripe never got written at or beyond this stripe offset yet. */
969 static void handle_short_read(int nob_read, obd_count page_count,
970                               struct brw_page **pga)
971 {
972         char *ptr;
973         int i = 0;
974
975         /* skip bytes read OK */
976         while (nob_read > 0) {
977                 LASSERT (page_count > 0);
978
979                 if (pga[i]->count > nob_read) {
980                         /* EOF inside this page */
981                         ptr = kmap(pga[i]->pg) +
982                                 (pga[i]->off & ~CFS_PAGE_MASK);
983                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
984                         kunmap(pga[i]->pg);
985                         page_count--;
986                         i++;
987                         break;
988                 }
989
990                 nob_read -= pga[i]->count;
991                 page_count--;
992                 i++;
993         }
994
995         /* zero remaining pages */
996         while (page_count-- > 0) {
997                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
998                 memset(ptr, 0, pga[i]->count);
999                 kunmap(pga[i]->pg);
1000                 i++;
1001         }
1002 }
1003
1004 static int check_write_rcs(struct ptlrpc_request *req,
1005                            int requested_nob, int niocount,
1006                            obd_count page_count, struct brw_page **pga)
1007 {
1008         int     i;
1009         __u32   *remote_rcs;
1010
1011         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1012                                                   sizeof(*remote_rcs) *
1013                                                   niocount);
1014         if (remote_rcs == NULL) {
1015                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1016                 return(-EPROTO);
1017         }
1018
1019         /* return error if any niobuf was in error */
1020         for (i = 0; i < niocount; i++) {
1021                 if ((int)remote_rcs[i] < 0)
1022                         return(remote_rcs[i]);
1023
1024                 if (remote_rcs[i] != 0) {
1025                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1026                                 i, remote_rcs[i], req);
1027                         return(-EPROTO);
1028                 }
1029         }
1030
1031         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1032                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1033                        req->rq_bulk->bd_nob_transferred, requested_nob);
1034                 return(-EPROTO);
1035         }
1036
1037         return (0);
1038 }
1039
1040 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1041 {
1042         if (p1->flag != p2->flag) {
1043                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1044                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1045                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1046
1047                 /* warn if we try to combine flags that we don't know to be
1048                  * safe to combine */
1049                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1050                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1051                               "report this at https://jira.hpdd.intel.com/\n",
1052                               p1->flag, p2->flag);
1053                 }
1054                 return 0;
1055         }
1056
1057         return (p1->off + p1->count == p2->off);
1058 }
1059
1060 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1061                                    struct brw_page **pga, int opc,
1062                                    cksum_type_t cksum_type)
1063 {
1064         __u32                           cksum;
1065         int                             i = 0;
1066         struct cfs_crypto_hash_desc     *hdesc;
1067         unsigned int                    bufsize;
1068         int                             err;
1069         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1070
1071         LASSERT(pg_count > 0);
1072
1073         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1074         if (IS_ERR(hdesc)) {
1075                 CERROR("Unable to initialize checksum hash %s\n",
1076                        cfs_crypto_hash_name(cfs_alg));
1077                 return PTR_ERR(hdesc);
1078         }
1079
1080         while (nob > 0 && pg_count > 0) {
1081                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1082
1083                 /* corrupt the data before we compute the checksum, to
1084                  * simulate an OST->client data error */
1085                 if (i == 0 && opc == OST_READ &&
1086                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1087                         unsigned char *ptr = kmap(pga[i]->pg);
1088                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1089
1090                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1091                         kunmap(pga[i]->pg);
1092                 }
1093                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1094                                             pga[i]->off & ~CFS_PAGE_MASK,
1095                                             count);
1096                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1097                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1098
1099                 nob -= pga[i]->count;
1100                 pg_count--;
1101                 i++;
1102         }
1103
1104         bufsize = sizeof(cksum);
1105         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1106
1107         /* For sending we only compute the wrong checksum instead
1108          * of corrupting the data so it is still correct on a redo */
1109         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1110                 cksum++;
1111
1112         return cksum;
1113 }
1114
1115 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1116                                 struct lov_stripe_md *lsm, obd_count page_count,
1117                                 struct brw_page **pga,
1118                                 struct ptlrpc_request **reqp,
1119                                 struct obd_capa *ocapa, int reserve,
1120                                 int resend)
1121 {
1122         struct ptlrpc_request   *req;
1123         struct ptlrpc_bulk_desc *desc;
1124         struct ost_body         *body;
1125         struct obd_ioobj        *ioobj;
1126         struct niobuf_remote    *niobuf;
1127         int niocount, i, requested_nob, opc, rc;
1128         struct osc_brw_async_args *aa;
1129         struct req_capsule      *pill;
1130         struct brw_page *pg_prev;
1131
1132         ENTRY;
1133         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1134                 RETURN(-ENOMEM); /* Recoverable */
1135         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1136                 RETURN(-EINVAL); /* Fatal */
1137
1138         if ((cmd & OBD_BRW_WRITE) != 0) {
1139                 opc = OST_WRITE;
1140                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1141                                                 cli->cl_import->imp_rq_pool,
1142                                                 &RQF_OST_BRW_WRITE);
1143         } else {
1144                 opc = OST_READ;
1145                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1146         }
1147         if (req == NULL)
1148                 RETURN(-ENOMEM);
1149
1150         for (niocount = i = 1; i < page_count; i++) {
1151                 if (!can_merge_pages(pga[i - 1], pga[i]))
1152                         niocount++;
1153         }
1154
1155         pill = &req->rq_pill;
1156         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1157                              sizeof(*ioobj));
1158         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1159                              niocount * sizeof(*niobuf));
1160         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1161
1162         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1163         if (rc) {
1164                 ptlrpc_request_free(req);
1165                 RETURN(rc);
1166         }
1167         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1168         ptlrpc_at_set_req_timeout(req);
1169         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1170          * retry logic */
1171         req->rq_no_retry_einprogress = 1;
1172
1173         desc = ptlrpc_prep_bulk_imp(req, page_count,
1174                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1175                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1176                 OST_BULK_PORTAL);
1177
1178         if (desc == NULL)
1179                 GOTO(out, rc = -ENOMEM);
1180         /* NB request now owns desc and will free it when it gets freed */
1181
1182         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1183         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1184         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1185         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1186
1187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1188
1189         obdo_to_ioobj(oa, ioobj);
1190         ioobj->ioo_bufcnt = niocount;
1191         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1192          * that might be send for this request.  The actual number is decided
1193          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1194          * "max - 1" for old client compatibility sending "0", and also so the
1195          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1196         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1197         osc_pack_capa(req, body, ocapa);
1198         LASSERT(page_count > 0);
1199         pg_prev = pga[0];
1200         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1201                 struct brw_page *pg = pga[i];
1202                 int poff = pg->off & ~CFS_PAGE_MASK;
1203
1204                 LASSERT(pg->count > 0);
1205                 /* make sure there is no gap in the middle of page array */
1206                 LASSERTF(page_count == 1 ||
1207                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1208                           ergo(i > 0 && i < page_count - 1,
1209                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1210                           ergo(i == page_count - 1, poff == 0)),
1211                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1212                          i, page_count, pg, pg->off, pg->count);
1213                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1214                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1215                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1216                          i, page_count,
1217                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1218                          pg_prev->pg, page_private(pg_prev->pg),
1219                          pg_prev->pg->index, pg_prev->off);
1220                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1221                         (pg->flag & OBD_BRW_SRVLOCK));
1222
1223                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1224                 requested_nob += pg->count;
1225
1226                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1227                         niobuf--;
1228                         niobuf->rnb_len += pg->count;
1229                 } else {
1230                         niobuf->rnb_offset = pg->off;
1231                         niobuf->rnb_len    = pg->count;
1232                         niobuf->rnb_flags  = pg->flag;
1233                 }
1234                 pg_prev = pg;
1235         }
1236
1237         LASSERTF((void *)(niobuf - niocount) ==
1238                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1239                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1240                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1241
1242         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1243         if (resend) {
1244                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1245                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1246                         body->oa.o_flags = 0;
1247                 }
1248                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1249         }
1250
1251         if (osc_should_shrink_grant(cli))
1252                 osc_shrink_grant_local(cli, &body->oa);
1253
1254         /* size[REQ_REC_OFF] still sizeof (*body) */
1255         if (opc == OST_WRITE) {
1256                 if (cli->cl_checksum &&
1257                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1258                         /* store cl_cksum_type in a local variable since
1259                          * it can be changed via lprocfs */
1260                         cksum_type_t cksum_type = cli->cl_cksum_type;
1261
1262                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1263                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1264                                 body->oa.o_flags = 0;
1265                         }
1266                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1267                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1268                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1269                                                              page_count, pga,
1270                                                              OST_WRITE,
1271                                                              cksum_type);
1272                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1273                                body->oa.o_cksum);
1274                         /* save this in 'oa', too, for later checking */
1275                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276                         oa->o_flags |= cksum_type_pack(cksum_type);
1277                 } else {
1278                         /* clear out the checksum flag, in case this is a
1279                          * resend but cl_checksum is no longer set. b=11238 */
1280                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1281                 }
1282                 oa->o_cksum = body->oa.o_cksum;
1283                 /* 1 RC per niobuf */
1284                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1285                                      sizeof(__u32) * niocount);
1286         } else {
1287                 if (cli->cl_checksum &&
1288                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1289                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1290                                 body->oa.o_flags = 0;
1291                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1292                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1293                 }
1294         }
1295         ptlrpc_request_set_replen(req);
1296
1297         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1298         aa = ptlrpc_req_async_args(req);
1299         aa->aa_oa = oa;
1300         aa->aa_requested_nob = requested_nob;
1301         aa->aa_nio_count = niocount;
1302         aa->aa_page_count = page_count;
1303         aa->aa_resends = 0;
1304         aa->aa_ppga = pga;
1305         aa->aa_cli = cli;
1306         INIT_LIST_HEAD(&aa->aa_oaps);
1307         if (ocapa && reserve)
1308                 aa->aa_ocapa = capa_get(ocapa);
1309
1310         *reqp = req;
1311         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1313                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1314                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1315         RETURN(0);
1316
1317  out:
1318         ptlrpc_req_finished(req);
1319         RETURN(rc);
1320 }
1321
1322 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1323                                 __u32 client_cksum, __u32 server_cksum, int nob,
1324                                 obd_count page_count, struct brw_page **pga,
1325                                 cksum_type_t client_cksum_type)
1326 {
1327         __u32 new_cksum;
1328         char *msg;
1329         cksum_type_t cksum_type;
1330
1331         if (server_cksum == client_cksum) {
1332                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1333                 return 0;
1334         }
1335
1336         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1337                                        oa->o_flags : 0);
1338         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1339                                       cksum_type);
1340
1341         if (cksum_type != client_cksum_type)
1342                 msg = "the server did not use the checksum type specified in "
1343                       "the original request - likely a protocol problem";
1344         else if (new_cksum == server_cksum)
1345                 msg = "changed on the client after we checksummed it - "
1346                       "likely false positive due to mmap IO (bug 11742)";
1347         else if (new_cksum == client_cksum)
1348                 msg = "changed in transit before arrival at OST";
1349         else
1350                 msg = "changed in transit AND doesn't match the original - "
1351                       "likely false positive due to mmap IO (bug 11742)";
1352
1353         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1354                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1355                            msg, libcfs_nid2str(peer->nid),
1356                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1357                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1359                            POSTID(&oa->o_oi), pga[0]->off,
1360                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1361         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1362                "client csum now %x\n", client_cksum, client_cksum_type,
1363                server_cksum, cksum_type, new_cksum);
1364         return 1;
1365 }
1366
1367 /* Note rc enters this function as number of bytes transferred */
1368 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1369 {
1370         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1371         const lnet_process_id_t *peer =
1372                         &req->rq_import->imp_connection->c_peer;
1373         struct client_obd *cli = aa->aa_cli;
1374         struct ost_body *body;
1375         u32 client_cksum = 0;
1376         ENTRY;
1377
1378         if (rc < 0 && rc != -EDQUOT) {
1379                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1380                 RETURN(rc);
1381         }
1382
1383         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1384         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1385         if (body == NULL) {
1386                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1387                 RETURN(-EPROTO);
1388         }
1389
1390         /* set/clear over quota flag for a uid/gid */
1391         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1392             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1393                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1394
1395                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1396                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1397                        body->oa.o_flags);
1398                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1399         }
1400
1401         osc_update_grant(cli, body);
1402
1403         if (rc < 0)
1404                 RETURN(rc);
1405
1406         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1407                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1408
1409         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1410                 if (rc > 0) {
1411                         CERROR("Unexpected +ve rc %d\n", rc);
1412                         RETURN(-EPROTO);
1413                 }
1414                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1415
1416                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1417                         RETURN(-EAGAIN);
1418
1419                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1420                     check_write_checksum(&body->oa, peer, client_cksum,
1421                                          body->oa.o_cksum, aa->aa_requested_nob,
1422                                          aa->aa_page_count, aa->aa_ppga,
1423                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1424                         RETURN(-EAGAIN);
1425
1426                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1427                                      aa->aa_page_count, aa->aa_ppga);
1428                 GOTO(out, rc);
1429         }
1430
1431         /* The rest of this function executes only for OST_READs */
1432
1433         /* if unwrap_bulk failed, return -EAGAIN to retry */
1434         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1435         if (rc < 0)
1436                 GOTO(out, rc = -EAGAIN);
1437
1438         if (rc > aa->aa_requested_nob) {
1439                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1440                        aa->aa_requested_nob);
1441                 RETURN(-EPROTO);
1442         }
1443
1444         if (rc != req->rq_bulk->bd_nob_transferred) {
1445                 CERROR ("Unexpected rc %d (%d transferred)\n",
1446                         rc, req->rq_bulk->bd_nob_transferred);
1447                 return (-EPROTO);
1448         }
1449
1450         if (rc < aa->aa_requested_nob)
1451                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1452
1453         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1454                 static int cksum_counter;
1455                 u32        server_cksum = body->oa.o_cksum;
1456                 char      *via = "";
1457                 char      *router = "";
1458                 cksum_type_t cksum_type;
1459
1460                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1461                                                body->oa.o_flags : 0);
1462                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1463                                                  aa->aa_ppga, OST_READ,
1464                                                  cksum_type);
1465
1466                 if (peer->nid != req->rq_bulk->bd_sender) {
1467                         via = " via ";
1468                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1469                 }
1470
1471                 if (server_cksum != client_cksum) {
1472                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1473                                            "%s%s%s inode "DFID" object "DOSTID
1474                                            " extent ["LPU64"-"LPU64"]\n",
1475                                            req->rq_import->imp_obd->obd_name,
1476                                            libcfs_nid2str(peer->nid),
1477                                            via, router,
1478                                            body->oa.o_valid & OBD_MD_FLFID ?
1479                                                 body->oa.o_parent_seq : (__u64)0,
1480                                            body->oa.o_valid & OBD_MD_FLFID ?
1481                                                 body->oa.o_parent_oid : 0,
1482                                            body->oa.o_valid & OBD_MD_FLFID ?
1483                                                 body->oa.o_parent_ver : 0,
1484                                            POSTID(&body->oa.o_oi),
1485                                            aa->aa_ppga[0]->off,
1486                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1487                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1488                                                                         1);
1489                         CERROR("client %x, server %x, cksum_type %x\n",
1490                                client_cksum, server_cksum, cksum_type);
1491                         cksum_counter = 0;
1492                         aa->aa_oa->o_cksum = client_cksum;
1493                         rc = -EAGAIN;
1494                 } else {
1495                         cksum_counter++;
1496                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1497                         rc = 0;
1498                 }
1499         } else if (unlikely(client_cksum)) {
1500                 static int cksum_missed;
1501
1502                 cksum_missed++;
1503                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1504                         CERROR("Checksum %u requested from %s but not sent\n",
1505                                cksum_missed, libcfs_nid2str(peer->nid));
1506         } else {
1507                 rc = 0;
1508         }
1509 out:
1510         if (rc >= 0)
1511                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1512                                      aa->aa_oa, &body->oa);
1513
1514         RETURN(rc);
1515 }
1516
1517 static int osc_brw_redo_request(struct ptlrpc_request *request,
1518                                 struct osc_brw_async_args *aa, int rc)
1519 {
1520         struct ptlrpc_request *new_req;
1521         struct osc_brw_async_args *new_aa;
1522         struct osc_async_page *oap;
1523         ENTRY;
1524
1525         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1526                   "redo for recoverable error %d", rc);
1527
1528         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1529                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1530                                   aa->aa_cli, aa->aa_oa,
1531                                   NULL /* lsm unused by osc currently */,
1532                                   aa->aa_page_count, aa->aa_ppga,
1533                                   &new_req, aa->aa_ocapa, 0, 1);
1534         if (rc)
1535                 RETURN(rc);
1536
1537         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1538                 if (oap->oap_request != NULL) {
1539                         LASSERTF(request == oap->oap_request,
1540                                  "request %p != oap_request %p\n",
1541                                  request, oap->oap_request);
1542                         if (oap->oap_interrupted) {
1543                                 ptlrpc_req_finished(new_req);
1544                                 RETURN(-EINTR);
1545                         }
1546                 }
1547         }
1548         /* New request takes over pga and oaps from old request.
1549          * Note that copying a list_head doesn't work, need to move it... */
1550         aa->aa_resends++;
1551         new_req->rq_interpret_reply = request->rq_interpret_reply;
1552         new_req->rq_async_args = request->rq_async_args;
1553         new_req->rq_commit_cb = request->rq_commit_cb;
1554         /* cap resend delay to the current request timeout, this is similar to
1555          * what ptlrpc does (see after_reply()) */
1556         if (aa->aa_resends > new_req->rq_timeout)
1557                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1558         else
1559                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1560         new_req->rq_generation_set = 1;
1561         new_req->rq_import_generation = request->rq_import_generation;
1562
1563         new_aa = ptlrpc_req_async_args(new_req);
1564
1565         INIT_LIST_HEAD(&new_aa->aa_oaps);
1566         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1567         INIT_LIST_HEAD(&new_aa->aa_exts);
1568         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1569         new_aa->aa_resends = aa->aa_resends;
1570
1571         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1572                 if (oap->oap_request) {
1573                         ptlrpc_req_finished(oap->oap_request);
1574                         oap->oap_request = ptlrpc_request_addref(new_req);
1575                 }
1576         }
1577
1578         new_aa->aa_ocapa = aa->aa_ocapa;
1579         aa->aa_ocapa = NULL;
1580
1581         /* XXX: This code will run into problem if we're going to support
1582          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1583          * and wait for all of them to be finished. We should inherit request
1584          * set from old request. */
1585         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1586
1587         DEBUG_REQ(D_INFO, new_req, "new request");
1588         RETURN(0);
1589 }
1590
1591 /*
1592  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1593  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1594  * fine for our small page arrays and doesn't require allocation.  its an
1595  * insertion sort that swaps elements that are strides apart, shrinking the
1596  * stride down until its '1' and the array is sorted.
1597  */
1598 static void sort_brw_pages(struct brw_page **array, int num)
1599 {
1600         int stride, i, j;
1601         struct brw_page *tmp;
1602
1603         if (num == 1)
1604                 return;
1605         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1606                 ;
1607
1608         do {
1609                 stride /= 3;
1610                 for (i = stride ; i < num ; i++) {
1611                         tmp = array[i];
1612                         j = i;
1613                         while (j >= stride && array[j - stride]->off > tmp->off) {
1614                                 array[j] = array[j - stride];
1615                                 j -= stride;
1616                         }
1617                         array[j] = tmp;
1618                 }
1619         } while (stride > 1);
1620 }
1621
1622 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1623 {
1624         LASSERT(ppga != NULL);
1625         OBD_FREE(ppga, sizeof(*ppga) * count);
1626 }
1627
1628 static int brw_interpret(const struct lu_env *env,
1629                          struct ptlrpc_request *req, void *data, int rc)
1630 {
1631         struct osc_brw_async_args *aa = data;
1632         struct osc_extent *ext;
1633         struct osc_extent *tmp;
1634         struct client_obd *cli = aa->aa_cli;
1635         ENTRY;
1636
1637         rc = osc_brw_fini_request(req, rc);
1638         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1639         /* When server return -EINPROGRESS, client should always retry
1640          * regardless of the number of times the bulk was resent already. */
1641         if (osc_recoverable_error(rc)) {
1642                 if (req->rq_import_generation !=
1643                     req->rq_import->imp_generation) {
1644                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1645                                ""DOSTID", rc = %d.\n",
1646                                req->rq_import->imp_obd->obd_name,
1647                                POSTID(&aa->aa_oa->o_oi), rc);
1648                 } else if (rc == -EINPROGRESS ||
1649                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1650                         rc = osc_brw_redo_request(req, aa, rc);
1651                 } else {
1652                         CERROR("%s: too many resent retries for object: "
1653                                ""LPU64":"LPU64", rc = %d.\n",
1654                                req->rq_import->imp_obd->obd_name,
1655                                POSTID(&aa->aa_oa->o_oi), rc);
1656                 }
1657
1658                 if (rc == 0)
1659                         RETURN(0);
1660                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1661                         rc = -EIO;
1662         }
1663
1664         if (aa->aa_ocapa) {
1665                 capa_put(aa->aa_ocapa);
1666                 aa->aa_ocapa = NULL;
1667         }
1668
1669         if (rc == 0) {
1670                 struct obdo *oa = aa->aa_oa;
1671                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1672                 unsigned long valid = 0;
1673                 struct cl_object *obj;
1674                 struct osc_async_page *last;
1675
1676                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1677                 obj = osc2cl(last->oap_obj);
1678
1679                 cl_object_attr_lock(obj);
1680                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1681                         attr->cat_blocks = oa->o_blocks;
1682                         valid |= CAT_BLOCKS;
1683                 }
1684                 if (oa->o_valid & OBD_MD_FLMTIME) {
1685                         attr->cat_mtime = oa->o_mtime;
1686                         valid |= CAT_MTIME;
1687                 }
1688                 if (oa->o_valid & OBD_MD_FLATIME) {
1689                         attr->cat_atime = oa->o_atime;
1690                         valid |= CAT_ATIME;
1691                 }
1692                 if (oa->o_valid & OBD_MD_FLCTIME) {
1693                         attr->cat_ctime = oa->o_ctime;
1694                         valid |= CAT_CTIME;
1695                 }
1696
1697                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1698                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1699                         loff_t last_off = last->oap_count + last->oap_obj_off +
1700                                 last->oap_page_off;
1701
1702                         /* Change file size if this is an out of quota or
1703                          * direct IO write and it extends the file size */
1704                         if (loi->loi_lvb.lvb_size < last_off) {
1705                                 attr->cat_size = last_off;
1706                                 valid |= CAT_SIZE;
1707                         }
1708                         /* Extend KMS if it's not a lockless write */
1709                         if (loi->loi_kms < last_off &&
1710                             oap2osc_page(last)->ops_srvlock == 0) {
1711                                 attr->cat_kms = last_off;
1712                                 valid |= CAT_KMS;
1713                         }
1714                 }
1715
1716                 if (valid != 0)
1717                         cl_object_attr_update(env, obj, attr, valid);
1718                 cl_object_attr_unlock(obj);
1719         }
1720         OBDO_FREE(aa->aa_oa);
1721
1722         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1723                 osc_inc_unstable_pages(req);
1724
1725         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1726                 list_del_init(&ext->oe_link);
1727                 osc_extent_finish(env, ext, 1, rc);
1728         }
1729         LASSERT(list_empty(&aa->aa_exts));
1730         LASSERT(list_empty(&aa->aa_oaps));
1731
1732         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1733                           req->rq_bulk->bd_nob_transferred);
1734         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1735         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1736
1737         spin_lock(&cli->cl_loi_list_lock);
1738         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1739          * is called so we know whether to go to sync BRWs or wait for more
1740          * RPCs to complete */
1741         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1742                 cli->cl_w_in_flight--;
1743         else
1744                 cli->cl_r_in_flight--;
1745         osc_wake_cache_waiters(cli);
1746         spin_unlock(&cli->cl_loi_list_lock);
1747
1748         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1749         RETURN(rc);
1750 }
1751
1752 static void brw_commit(struct ptlrpc_request *req)
1753 {
1754         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1755          * this called via the rq_commit_cb, I need to ensure
1756          * osc_dec_unstable_pages is still called. Otherwise unstable
1757          * pages may be leaked. */
1758         spin_lock(&req->rq_lock);
1759         if (likely(req->rq_unstable)) {
1760                 req->rq_unstable = 0;
1761                 spin_unlock(&req->rq_lock);
1762
1763                 osc_dec_unstable_pages(req);
1764         } else {
1765                 req->rq_committed = 1;
1766                 spin_unlock(&req->rq_lock);
1767         }
1768 }
1769
1770 /**
1771  * Build an RPC by the list of extent @ext_list. The caller must ensure
1772  * that the total pages in this list are NOT over max pages per RPC.
1773  * Extents in the list must be in OES_RPC state.
1774  */
1775 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1776                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1777 {
1778         struct ptlrpc_request           *req = NULL;
1779         struct osc_extent               *ext;
1780         struct brw_page                 **pga = NULL;
1781         struct osc_brw_async_args       *aa = NULL;
1782         struct obdo                     *oa = NULL;
1783         struct osc_async_page           *oap;
1784         struct osc_async_page           *tmp;
1785         struct cl_req                   *clerq = NULL;
1786         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1787                                                                       CRT_READ;
1788         struct cl_req_attr              *crattr = NULL;
1789         obd_off                         starting_offset = OBD_OBJECT_EOF;
1790         obd_off                         ending_offset = 0;
1791         int                             mpflag = 0;
1792         int                             mem_tight = 0;
1793         int                             page_count = 0;
1794         bool                            soft_sync = false;
1795         int                             i;
1796         int                             rc;
1797         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1798         struct ost_body                 *body;
1799         ENTRY;
1800         LASSERT(!list_empty(ext_list));
1801
1802         /* add pages into rpc_list to build BRW rpc */
1803         list_for_each_entry(ext, ext_list, oe_link) {
1804                 LASSERT(ext->oe_state == OES_RPC);
1805                 mem_tight |= ext->oe_memalloc;
1806                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1807                         ++page_count;
1808                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1809                         if (starting_offset > oap->oap_obj_off)
1810                                 starting_offset = oap->oap_obj_off;
1811                         else
1812                                 LASSERT(oap->oap_page_off == 0);
1813                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1814                                 ending_offset = oap->oap_obj_off +
1815                                                 oap->oap_count;
1816                         else
1817                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1818                                         PAGE_CACHE_SIZE);
1819                 }
1820         }
1821
1822         soft_sync = osc_over_unstable_soft_limit(cli);
1823         if (mem_tight)
1824                 mpflag = cfs_memory_pressure_get_and_set();
1825
1826         OBD_ALLOC(crattr, sizeof(*crattr));
1827         if (crattr == NULL)
1828                 GOTO(out, rc = -ENOMEM);
1829
1830         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1831         if (pga == NULL)
1832                 GOTO(out, rc = -ENOMEM);
1833
1834         OBDO_ALLOC(oa);
1835         if (oa == NULL)
1836                 GOTO(out, rc = -ENOMEM);
1837
1838         i = 0;
1839         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1840                 struct cl_page *page = oap2cl_page(oap);
1841                 if (clerq == NULL) {
1842                         clerq = cl_req_alloc(env, page, crt,
1843                                              1 /* only 1-object rpcs for now */);
1844                         if (IS_ERR(clerq))
1845                                 GOTO(out, rc = PTR_ERR(clerq));
1846                 }
1847                 if (mem_tight)
1848                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1849                 if (soft_sync)
1850                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1851                 pga[i] = &oap->oap_brw_page;
1852                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1853                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1854                        pga[i]->pg, page_index(oap->oap_page), oap,
1855                        pga[i]->flag);
1856                 i++;
1857                 cl_req_page_add(env, clerq, page);
1858         }
1859
1860         /* always get the data for the obdo for the rpc */
1861         LASSERT(clerq != NULL);
1862         crattr->cra_oa = oa;
1863         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1864
1865         rc = cl_req_prep(env, clerq);
1866         if (rc != 0) {
1867                 CERROR("cl_req_prep failed: %d\n", rc);
1868                 GOTO(out, rc);
1869         }
1870
1871         sort_brw_pages(pga, page_count);
1872         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1873                         pga, &req, crattr->cra_capa, 1, 0);
1874         if (rc != 0) {
1875                 CERROR("prep_req failed: %d\n", rc);
1876                 GOTO(out, rc);
1877         }
1878
1879         req->rq_commit_cb = brw_commit;
1880         req->rq_interpret_reply = brw_interpret;
1881
1882         if (mem_tight != 0)
1883                 req->rq_memalloc = 1;
1884
1885         /* Need to update the timestamps after the request is built in case
1886          * we race with setattr (locally or in queue at OST).  If OST gets
1887          * later setattr before earlier BRW (as determined by the request xid),
1888          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1889          * way to do this in a single call.  bug 10150 */
1890         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1891         crattr->cra_oa = &body->oa;
1892         cl_req_attr_set(env, clerq, crattr,
1893                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1894
1895         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1896
1897         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1898         aa = ptlrpc_req_async_args(req);
1899         INIT_LIST_HEAD(&aa->aa_oaps);
1900         list_splice_init(&rpc_list, &aa->aa_oaps);
1901         INIT_LIST_HEAD(&aa->aa_exts);
1902         list_splice_init(ext_list, &aa->aa_exts);
1903         aa->aa_clerq = clerq;
1904
1905         /* queued sync pages can be torn down while the pages
1906          * were between the pending list and the rpc */
1907         tmp = NULL;
1908         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1909                 /* only one oap gets a request reference */
1910                 if (tmp == NULL)
1911                         tmp = oap;
1912                 if (oap->oap_interrupted && !req->rq_intr) {
1913                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1914                                         oap, req);
1915                         ptlrpc_mark_interrupted(req);
1916                 }
1917         }
1918         if (tmp != NULL)
1919                 tmp->oap_request = ptlrpc_request_addref(req);
1920
1921         spin_lock(&cli->cl_loi_list_lock);
1922         starting_offset >>= PAGE_CACHE_SHIFT;
1923         if (cmd == OBD_BRW_READ) {
1924                 cli->cl_r_in_flight++;
1925                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1926                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1927                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1928                                       starting_offset + 1);
1929         } else {
1930                 cli->cl_w_in_flight++;
1931                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1932                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1933                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1934                                       starting_offset + 1);
1935         }
1936         spin_unlock(&cli->cl_loi_list_lock);
1937
1938         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1939                   page_count, aa, cli->cl_r_in_flight,
1940                   cli->cl_w_in_flight);
1941
1942         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1943          * see which CPU/NUMA node the majority of pages were allocated
1944          * on, and try to assign the async RPC to the CPU core
1945          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1946          *
1947          * But on the other hand, we expect that multiple ptlrpcd
1948          * threads and the initial write sponsor can run in parallel,
1949          * especially when data checksum is enabled, which is CPU-bound
1950          * operation and single ptlrpcd thread cannot process in time.
1951          * So more ptlrpcd threads sharing BRW load
1952          * (with PDL_POLICY_ROUND) seems better.
1953          */
1954         ptlrpcd_add_req(req, pol, -1);
1955         rc = 0;
1956         EXIT;
1957
1958 out:
1959         if (mem_tight != 0)
1960                 cfs_memory_pressure_restore(mpflag);
1961
1962         if (crattr != NULL) {
1963                 capa_put(crattr->cra_capa);
1964                 OBD_FREE(crattr, sizeof(*crattr));
1965         }
1966
1967         if (rc != 0) {
1968                 LASSERT(req == NULL);
1969
1970                 if (oa)
1971                         OBDO_FREE(oa);
1972                 if (pga)
1973                         OBD_FREE(pga, sizeof(*pga) * page_count);
1974                 /* this should happen rarely and is pretty bad, it makes the
1975                  * pending list not follow the dirty order */
1976                 while (!list_empty(ext_list)) {
1977                         ext = list_entry(ext_list->next, struct osc_extent,
1978                                          oe_link);
1979                         list_del_init(&ext->oe_link);
1980                         osc_extent_finish(env, ext, 0, rc);
1981                 }
1982                 if (clerq && !IS_ERR(clerq))
1983                         cl_req_completion(env, clerq, rc);
1984         }
1985         RETURN(rc);
1986 }
1987
1988 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1989                                         struct ldlm_enqueue_info *einfo)
1990 {
1991         void *data = einfo->ei_cbdata;
1992         int set = 0;
1993
1994         LASSERT(lock != NULL);
1995         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1996         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1997         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1998         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1999
2000         lock_res_and_lock(lock);
2001
2002         if (lock->l_ast_data == NULL)
2003                 lock->l_ast_data = data;
2004         if (lock->l_ast_data == data)
2005                 set = 1;
2006
2007         unlock_res_and_lock(lock);
2008
2009         return set;
2010 }
2011
2012 static int osc_set_data_with_check(struct lustre_handle *lockh,
2013                                    struct ldlm_enqueue_info *einfo)
2014 {
2015         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2016         int set = 0;
2017
2018         if (lock != NULL) {
2019                 set = osc_set_lock_data_with_check(lock, einfo);
2020                 LDLM_LOCK_PUT(lock);
2021         } else
2022                 CERROR("lockh %p, data %p - client evicted?\n",
2023                        lockh, einfo->ei_cbdata);
2024         return set;
2025 }
2026
2027 static int osc_enqueue_fini(struct ptlrpc_request *req,
2028                             osc_enqueue_upcall_f upcall, void *cookie,
2029                             struct lustre_handle *lockh, ldlm_mode_t mode,
2030                             __u64 *flags, int agl, int errcode)
2031 {
2032         bool intent = *flags & LDLM_FL_HAS_INTENT;
2033         int rc;
2034         ENTRY;
2035
2036         /* The request was created before ldlm_cli_enqueue call. */
2037         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2038                 struct ldlm_reply *rep;
2039
2040                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2041                 LASSERT(rep != NULL);
2042
2043                 rep->lock_policy_res1 =
2044                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2045                 if (rep->lock_policy_res1)
2046                         errcode = rep->lock_policy_res1;
2047                 if (!agl)
2048                         *flags |= LDLM_FL_LVB_READY;
2049         } else if (errcode == ELDLM_OK) {
2050                 *flags |= LDLM_FL_LVB_READY;
2051         }
2052
2053         /* Call the update callback. */
2054         rc = (*upcall)(cookie, lockh, errcode);
2055
2056         /* release the reference taken in ldlm_cli_enqueue() */
2057         if (errcode == ELDLM_LOCK_MATCHED)
2058                 errcode = ELDLM_OK;
2059         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2060                 ldlm_lock_decref(lockh, mode);
2061
2062         RETURN(rc);
2063 }
2064
2065 static int osc_enqueue_interpret(const struct lu_env *env,
2066                                  struct ptlrpc_request *req,
2067                                  struct osc_enqueue_args *aa, int rc)
2068 {
2069         struct ldlm_lock *lock;
2070         struct lustre_handle *lockh = &aa->oa_lockh;
2071         ldlm_mode_t mode = aa->oa_mode;
2072         struct ost_lvb *lvb = aa->oa_lvb;
2073         __u32 lvb_len = sizeof(*lvb);
2074         __u64 flags = 0;
2075
2076         ENTRY;
2077
2078         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2079          * be valid. */
2080         lock = ldlm_handle2lock(lockh);
2081         LASSERTF(lock != NULL,
2082                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2083                  lockh->cookie, req, aa);
2084
2085         /* Take an additional reference so that a blocking AST that
2086          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2087          * to arrive after an upcall has been executed by
2088          * osc_enqueue_fini(). */
2089         ldlm_lock_addref(lockh, mode);
2090
2091         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2092         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2093
2094         /* Let CP AST to grant the lock first. */
2095         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2096
2097         if (aa->oa_agl) {
2098                 LASSERT(aa->oa_lvb == NULL);
2099                 LASSERT(aa->oa_flags == NULL);
2100                 aa->oa_flags = &flags;
2101         }
2102
2103         /* Complete obtaining the lock procedure. */
2104         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2105                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2106                                    lockh, rc);
2107         /* Complete osc stuff. */
2108         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2109                               aa->oa_flags, aa->oa_agl, rc);
2110
2111         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2112
2113         ldlm_lock_decref(lockh, mode);
2114         LDLM_LOCK_PUT(lock);
2115         RETURN(rc);
2116 }
2117
2118 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2119
2120 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2121  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2122  * other synchronous requests, however keeping some locks and trying to obtain
2123  * others may take a considerable amount of time in a case of ost failure; and
2124  * when other sync requests do not get released lock from a client, the client
2125  * is evicted from the cluster -- such scenarious make the life difficult, so
2126  * release locks just after they are obtained. */
2127 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2128                      __u64 *flags, ldlm_policy_data_t *policy,
2129                      struct ost_lvb *lvb, int kms_valid,
2130                      osc_enqueue_upcall_f upcall, void *cookie,
2131                      struct ldlm_enqueue_info *einfo,
2132                      struct ptlrpc_request_set *rqset, int async, int agl)
2133 {
2134         struct obd_device *obd = exp->exp_obd;
2135         struct lustre_handle lockh = { 0 };
2136         struct ptlrpc_request *req = NULL;
2137         int intent = *flags & LDLM_FL_HAS_INTENT;
2138         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2139         ldlm_mode_t mode;
2140         int rc;
2141         ENTRY;
2142
2143         /* Filesystem lock extents are extended to page boundaries so that
2144          * dealing with the page cache is a little smoother.  */
2145         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2146         policy->l_extent.end |= ~CFS_PAGE_MASK;
2147
2148         /*
2149          * kms is not valid when either object is completely fresh (so that no
2150          * locks are cached), or object was evicted. In the latter case cached
2151          * lock cannot be used, because it would prime inode state with
2152          * potentially stale LVB.
2153          */
2154         if (!kms_valid)
2155                 goto no_match;
2156
2157         /* Next, search for already existing extent locks that will cover us */
2158         /* If we're trying to read, we also search for an existing PW lock.  The
2159          * VFS and page cache already protect us locally, so lots of readers/
2160          * writers can share a single PW lock.
2161          *
2162          * There are problems with conversion deadlocks, so instead of
2163          * converting a read lock to a write lock, we'll just enqueue a new
2164          * one.
2165          *
2166          * At some point we should cancel the read lock instead of making them
2167          * send us a blocking callback, but there are problems with canceling
2168          * locks out from other users right now, too. */
2169         mode = einfo->ei_mode;
2170         if (einfo->ei_mode == LCK_PR)
2171                 mode |= LCK_PW;
2172         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2173                                einfo->ei_type, policy, mode, &lockh, 0);
2174         if (mode) {
2175                 struct ldlm_lock *matched;
2176
2177                 if (*flags & LDLM_FL_TEST_LOCK)
2178                         RETURN(ELDLM_OK);
2179
2180                 matched = ldlm_handle2lock(&lockh);
2181                 if (agl) {
2182                         /* AGL enqueues DLM locks speculatively. Therefore if
2183                          * it already exists a DLM lock, it wll just inform the
2184                          * caller to cancel the AGL process for this stripe. */
2185                         ldlm_lock_decref(&lockh, mode);
2186                         LDLM_LOCK_PUT(matched);
2187                         RETURN(-ECANCELED);
2188                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2189                         *flags |= LDLM_FL_LVB_READY;
2190
2191                         /* We already have a lock, and it's referenced. */
2192                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2193
2194                         ldlm_lock_decref(&lockh, mode);
2195                         LDLM_LOCK_PUT(matched);
2196                         RETURN(ELDLM_OK);
2197                 } else {
2198                         ldlm_lock_decref(&lockh, mode);
2199                         LDLM_LOCK_PUT(matched);
2200                 }
2201         }
2202
2203 no_match:
2204         if (*flags & LDLM_FL_TEST_LOCK)
2205                 RETURN(-ENOLCK);
2206
2207         if (intent) {
2208                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2209                                            &RQF_LDLM_ENQUEUE_LVB);
2210                 if (req == NULL)
2211                         RETURN(-ENOMEM);
2212
2213                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2214                 if (rc < 0) {
2215                         ptlrpc_request_free(req);
2216                         RETURN(rc);
2217                 }
2218
2219                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2220                                      sizeof *lvb);
2221                 ptlrpc_request_set_replen(req);
2222         }
2223
2224         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2225         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2226
2227         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2228                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2229         if (async) {
2230                 if (!rc) {
2231                         struct osc_enqueue_args *aa;
2232                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2233                         aa = ptlrpc_req_async_args(req);
2234                         aa->oa_exp    = exp;
2235                         aa->oa_mode   = einfo->ei_mode;
2236                         aa->oa_type   = einfo->ei_type;
2237                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2238                         aa->oa_upcall = upcall;
2239                         aa->oa_cookie = cookie;
2240                         aa->oa_agl    = !!agl;
2241                         if (!agl) {
2242                                 aa->oa_flags  = flags;
2243                                 aa->oa_lvb    = lvb;
2244                         } else {
2245                                 /* AGL is essentially to enqueue an DLM lock
2246                                  * in advance, so we don't care about the
2247                                  * result of AGL enqueue. */
2248                                 aa->oa_lvb    = NULL;
2249                                 aa->oa_flags  = NULL;
2250                         }
2251
2252                         req->rq_interpret_reply =
2253                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2254                         if (rqset == PTLRPCD_SET)
2255                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2256                         else
2257                                 ptlrpc_set_add_req(rqset, req);
2258                 } else if (intent) {
2259                         ptlrpc_req_finished(req);
2260                 }
2261                 RETURN(rc);
2262         }
2263
2264         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2265                               flags, agl, rc);
2266         if (intent)
2267                 ptlrpc_req_finished(req);
2268
2269         RETURN(rc);
2270 }
2271
2272 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2273                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2274                    __u64 *flags, void *data, struct lustre_handle *lockh,
2275                    int unref)
2276 {
2277         struct obd_device *obd = exp->exp_obd;
2278         __u64 lflags = *flags;
2279         ldlm_mode_t rc;
2280         ENTRY;
2281
2282         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2283                 RETURN(-EIO);
2284
2285         /* Filesystem lock extents are extended to page boundaries so that
2286          * dealing with the page cache is a little smoother */
2287         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2288         policy->l_extent.end |= ~CFS_PAGE_MASK;
2289
2290         /* Next, search for already existing extent locks that will cover us */
2291         /* If we're trying to read, we also search for an existing PW lock.  The
2292          * VFS and page cache already protect us locally, so lots of readers/
2293          * writers can share a single PW lock. */
2294         rc = mode;
2295         if (mode == LCK_PR)
2296                 rc |= LCK_PW;
2297         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2298                              res_id, type, policy, rc, lockh, unref);
2299         if (rc) {
2300                 if (data != NULL) {
2301                         if (!osc_set_data_with_check(lockh, data)) {
2302                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2303                                         ldlm_lock_decref(lockh, rc);
2304                                 RETURN(0);
2305                         }
2306                 }
2307                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2308                         ldlm_lock_addref(lockh, LCK_PR);
2309                         ldlm_lock_decref(lockh, LCK_PW);
2310                 }
2311                 RETURN(rc);
2312         }
2313         RETURN(rc);
2314 }
2315
2316 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2317 {
2318         ENTRY;
2319
2320         if (unlikely(mode == LCK_GROUP))
2321                 ldlm_lock_decref_and_cancel(lockh, mode);
2322         else
2323                 ldlm_lock_decref(lockh, mode);
2324
2325         RETURN(0);
2326 }
2327
2328 static int osc_statfs_interpret(const struct lu_env *env,
2329                                 struct ptlrpc_request *req,
2330                                 struct osc_async_args *aa, int rc)
2331 {
2332         struct obd_statfs *msfs;
2333         ENTRY;
2334
2335         if (rc == -EBADR)
2336                 /* The request has in fact never been sent
2337                  * due to issues at a higher level (LOV).
2338                  * Exit immediately since the caller is
2339                  * aware of the problem and takes care
2340                  * of the clean up */
2341                  RETURN(rc);
2342
2343         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2344             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2345                 GOTO(out, rc = 0);
2346
2347         if (rc != 0)
2348                 GOTO(out, rc);
2349
2350         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2351         if (msfs == NULL) {
2352                 GOTO(out, rc = -EPROTO);
2353         }
2354
2355         *aa->aa_oi->oi_osfs = *msfs;
2356 out:
2357         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2358         RETURN(rc);
2359 }
2360
2361 static int osc_statfs_async(struct obd_export *exp,
2362                             struct obd_info *oinfo, __u64 max_age,
2363                             struct ptlrpc_request_set *rqset)
2364 {
2365         struct obd_device     *obd = class_exp2obd(exp);
2366         struct ptlrpc_request *req;
2367         struct osc_async_args *aa;
2368         int                    rc;
2369         ENTRY;
2370
2371         /* We could possibly pass max_age in the request (as an absolute
2372          * timestamp or a "seconds.usec ago") so the target can avoid doing
2373          * extra calls into the filesystem if that isn't necessary (e.g.
2374          * during mount that would help a bit).  Having relative timestamps
2375          * is not so great if request processing is slow, while absolute
2376          * timestamps are not ideal because they need time synchronization. */
2377         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2378         if (req == NULL)
2379                 RETURN(-ENOMEM);
2380
2381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2382         if (rc) {
2383                 ptlrpc_request_free(req);
2384                 RETURN(rc);
2385         }
2386         ptlrpc_request_set_replen(req);
2387         req->rq_request_portal = OST_CREATE_PORTAL;
2388         ptlrpc_at_set_req_timeout(req);
2389
2390         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2391                 /* procfs requests not want stat in wait for avoid deadlock */
2392                 req->rq_no_resend = 1;
2393                 req->rq_no_delay = 1;
2394         }
2395
2396         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2397         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2398         aa = ptlrpc_req_async_args(req);
2399         aa->aa_oi = oinfo;
2400
2401         ptlrpc_set_add_req(rqset, req);
2402         RETURN(0);
2403 }
2404
2405 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2406                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2407 {
2408         struct obd_device     *obd = class_exp2obd(exp);
2409         struct obd_statfs     *msfs;
2410         struct ptlrpc_request *req;
2411         struct obd_import     *imp = NULL;
2412         int rc;
2413         ENTRY;
2414
2415         /*Since the request might also come from lprocfs, so we need
2416          *sync this with client_disconnect_export Bug15684*/
2417         down_read(&obd->u.cli.cl_sem);
2418         if (obd->u.cli.cl_import)
2419                 imp = class_import_get(obd->u.cli.cl_import);
2420         up_read(&obd->u.cli.cl_sem);
2421         if (!imp)
2422                 RETURN(-ENODEV);
2423
2424         /* We could possibly pass max_age in the request (as an absolute
2425          * timestamp or a "seconds.usec ago") so the target can avoid doing
2426          * extra calls into the filesystem if that isn't necessary (e.g.
2427          * during mount that would help a bit).  Having relative timestamps
2428          * is not so great if request processing is slow, while absolute
2429          * timestamps are not ideal because they need time synchronization. */
2430         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2431
2432         class_import_put(imp);
2433
2434         if (req == NULL)
2435                 RETURN(-ENOMEM);
2436
2437         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2438         if (rc) {
2439                 ptlrpc_request_free(req);
2440                 RETURN(rc);
2441         }
2442         ptlrpc_request_set_replen(req);
2443         req->rq_request_portal = OST_CREATE_PORTAL;
2444         ptlrpc_at_set_req_timeout(req);
2445
2446         if (flags & OBD_STATFS_NODELAY) {
2447                 /* procfs requests not want stat in wait for avoid deadlock */
2448                 req->rq_no_resend = 1;
2449                 req->rq_no_delay = 1;
2450         }
2451
2452         rc = ptlrpc_queue_wait(req);
2453         if (rc)
2454                 GOTO(out, rc);
2455
2456         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2457         if (msfs == NULL) {
2458                 GOTO(out, rc = -EPROTO);
2459         }
2460
2461         *osfs = *msfs;
2462
2463         EXIT;
2464  out:
2465         ptlrpc_req_finished(req);
2466         return rc;
2467 }
2468
2469 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2470                          void *karg, void *uarg)
2471 {
2472         struct obd_device *obd = exp->exp_obd;
2473         struct obd_ioctl_data *data = karg;
2474         int err = 0;
2475         ENTRY;
2476
2477         if (!try_module_get(THIS_MODULE)) {
2478                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2479                        module_name(THIS_MODULE));
2480                 return -EINVAL;
2481         }
2482         switch (cmd) {
2483         case OBD_IOC_CLIENT_RECOVER:
2484                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2485                                             data->ioc_inlbuf1, 0);
2486                 if (err > 0)
2487                         err = 0;
2488                 GOTO(out, err);
2489         case IOC_OSC_SET_ACTIVE:
2490                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2491                                                data->ioc_offset);
2492                 GOTO(out, err);
2493         case OBD_IOC_POLL_QUOTACHECK:
2494                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2495                 GOTO(out, err);
2496         case OBD_IOC_PING_TARGET:
2497                 err = ptlrpc_obd_ping(obd);
2498                 GOTO(out, err);
2499         default:
2500                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2501                        cmd, current_comm());
2502                 GOTO(out, err = -ENOTTY);
2503         }
2504 out:
2505         module_put(THIS_MODULE);
2506         return err;
2507 }
2508
2509 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2510                               obd_count keylen, void *key, obd_count vallen,
2511                               void *val, struct ptlrpc_request_set *set)
2512 {
2513         struct ptlrpc_request *req;
2514         struct obd_device     *obd = exp->exp_obd;
2515         struct obd_import     *imp = class_exp2cliimp(exp);
2516         char                  *tmp;
2517         int                    rc;
2518         ENTRY;
2519
2520         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2521
2522         if (KEY_IS(KEY_CHECKSUM)) {
2523                 if (vallen != sizeof(int))
2524                         RETURN(-EINVAL);
2525                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2526                 RETURN(0);
2527         }
2528
2529         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2530                 sptlrpc_conf_client_adapt(obd);
2531                 RETURN(0);
2532         }
2533
2534         if (KEY_IS(KEY_FLUSH_CTX)) {
2535                 sptlrpc_import_flush_my_ctx(imp);
2536                 RETURN(0);
2537         }
2538
2539         if (KEY_IS(KEY_CACHE_SET)) {
2540                 struct client_obd *cli = &obd->u.cli;
2541
2542                 LASSERT(cli->cl_cache == NULL); /* only once */
2543                 cli->cl_cache = (struct cl_client_cache *)val;
2544                 cl_cache_incref(cli->cl_cache);
2545                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2546
2547                 /* add this osc into entity list */
2548                 LASSERT(list_empty(&cli->cl_lru_osc));
2549                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2550                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2551                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2552
2553                 RETURN(0);
2554         }
2555
2556         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2557                 struct client_obd *cli = &obd->u.cli;
2558                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2559                 long target = *(long *)val;
2560
2561                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2562                 *(long *)val -= nr;
2563                 RETURN(0);
2564         }
2565
2566         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2567                 RETURN(-EINVAL);
2568
2569         /* We pass all other commands directly to OST. Since nobody calls osc
2570            methods directly and everybody is supposed to go through LOV, we
2571            assume lov checked invalid values for us.
2572            The only recognised values so far are evict_by_nid and mds_conn.
2573            Even if something bad goes through, we'd get a -EINVAL from OST
2574            anyway. */
2575
2576         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2577                                                 &RQF_OST_SET_GRANT_INFO :
2578                                                 &RQF_OBD_SET_INFO);
2579         if (req == NULL)
2580                 RETURN(-ENOMEM);
2581
2582         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2583                              RCL_CLIENT, keylen);
2584         if (!KEY_IS(KEY_GRANT_SHRINK))
2585                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2586                                      RCL_CLIENT, vallen);
2587         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2588         if (rc) {
2589                 ptlrpc_request_free(req);
2590                 RETURN(rc);
2591         }
2592
2593         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2594         memcpy(tmp, key, keylen);
2595         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2596                                                         &RMF_OST_BODY :
2597                                                         &RMF_SETINFO_VAL);
2598         memcpy(tmp, val, vallen);
2599
2600         if (KEY_IS(KEY_GRANT_SHRINK)) {
2601                 struct osc_grant_args *aa;
2602                 struct obdo *oa;
2603
2604                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2605                 aa = ptlrpc_req_async_args(req);
2606                 OBDO_ALLOC(oa);
2607                 if (!oa) {
2608                         ptlrpc_req_finished(req);
2609                         RETURN(-ENOMEM);
2610                 }
2611                 *oa = ((struct ost_body *)val)->oa;
2612                 aa->aa_oa = oa;
2613                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2614         }
2615
2616         ptlrpc_request_set_replen(req);
2617         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2618                 LASSERT(set != NULL);
2619                 ptlrpc_set_add_req(set, req);
2620                 ptlrpc_check_set(NULL, set);
2621         } else
2622                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2623
2624         RETURN(0);
2625 }
2626
2627 static int osc_reconnect(const struct lu_env *env,
2628                          struct obd_export *exp, struct obd_device *obd,
2629                          struct obd_uuid *cluuid,
2630                          struct obd_connect_data *data,
2631                          void *localdata)
2632 {
2633         struct client_obd *cli = &obd->u.cli;
2634
2635         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2636                 long lost_grant;
2637
2638                 spin_lock(&cli->cl_loi_list_lock);
2639                 data->ocd_grant = (cli->cl_avail_grant +
2640                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2641                                   2 * cli_brw_size(obd);
2642                 lost_grant = cli->cl_lost_grant;
2643                 cli->cl_lost_grant = 0;
2644                 spin_unlock(&cli->cl_loi_list_lock);
2645
2646                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2647                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2648                        data->ocd_version, data->ocd_grant, lost_grant);
2649         }
2650
2651         RETURN(0);
2652 }
2653
2654 static int osc_disconnect(struct obd_export *exp)
2655 {
2656         struct obd_device *obd = class_exp2obd(exp);
2657         int rc;
2658
2659         rc = client_disconnect_export(exp);
2660         /**
2661          * Initially we put del_shrink_grant before disconnect_export, but it
2662          * causes the following problem if setup (connect) and cleanup
2663          * (disconnect) are tangled together.
2664          *      connect p1                     disconnect p2
2665          *   ptlrpc_connect_import
2666          *     ...............               class_manual_cleanup
2667          *                                     osc_disconnect
2668          *                                     del_shrink_grant
2669          *   ptlrpc_connect_interrupt
2670          *     init_grant_shrink
2671          *   add this client to shrink list
2672          *                                      cleanup_osc
2673          * Bang! pinger trigger the shrink.
2674          * So the osc should be disconnected from the shrink list, after we
2675          * are sure the import has been destroyed. BUG18662
2676          */
2677         if (obd->u.cli.cl_import == NULL)
2678                 osc_del_shrink_grant(&obd->u.cli);
2679         return rc;
2680 }
2681
2682 static int osc_import_event(struct obd_device *obd,
2683                             struct obd_import *imp,
2684                             enum obd_import_event event)
2685 {
2686         struct client_obd *cli;
2687         int rc = 0;
2688
2689         ENTRY;
2690         LASSERT(imp->imp_obd == obd);
2691
2692         switch (event) {
2693         case IMP_EVENT_DISCON: {
2694                 cli = &obd->u.cli;
2695                 spin_lock(&cli->cl_loi_list_lock);
2696                 cli->cl_avail_grant = 0;
2697                 cli->cl_lost_grant = 0;
2698                 spin_unlock(&cli->cl_loi_list_lock);
2699                 break;
2700         }
2701         case IMP_EVENT_INACTIVE: {
2702                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2703                 break;
2704         }
2705         case IMP_EVENT_INVALIDATE: {
2706                 struct ldlm_namespace *ns = obd->obd_namespace;
2707                 struct lu_env         *env;
2708                 int                    refcheck;
2709
2710                 env = cl_env_get(&refcheck);
2711                 if (!IS_ERR(env)) {
2712                         /* Reset grants */
2713                         cli = &obd->u.cli;
2714                         /* all pages go to failing rpcs due to the invalid
2715                          * import */
2716                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2717
2718                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2719                         cl_env_put(env, &refcheck);
2720                 } else
2721                         rc = PTR_ERR(env);
2722                 break;
2723         }
2724         case IMP_EVENT_ACTIVE: {
2725                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2726                 break;
2727         }
2728         case IMP_EVENT_OCD: {
2729                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2730
2731                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2732                         osc_init_grant(&obd->u.cli, ocd);
2733
2734                 /* See bug 7198 */
2735                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2736                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2737
2738                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2739                 break;
2740         }
2741         case IMP_EVENT_DEACTIVATE: {
2742                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2743                 break;
2744         }
2745         case IMP_EVENT_ACTIVATE: {
2746                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2747                 break;
2748         }
2749         default:
2750                 CERROR("Unknown import event %d\n", event);
2751                 LBUG();
2752         }
2753         RETURN(rc);
2754 }
2755
2756 /**
2757  * Determine whether the lock can be canceled before replaying the lock
2758  * during recovery, see bug16774 for detailed information.
2759  *
2760  * \retval zero the lock can't be canceled
2761  * \retval other ok to cancel
2762  */
2763 static int osc_cancel_weight(struct ldlm_lock *lock)
2764 {
2765         /*
2766          * Cancel all unused and granted extent lock.
2767          */
2768         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2769             lock->l_granted_mode == lock->l_req_mode &&
2770             osc_ldlm_weigh_ast(lock) == 0)
2771                 RETURN(1);
2772
2773         RETURN(0);
2774 }
2775
2776 static int brw_queue_work(const struct lu_env *env, void *data)
2777 {
2778         struct client_obd *cli = data;
2779
2780         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2781
2782         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2783         RETURN(0);
2784 }
2785
2786 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2787 {
2788         struct client_obd *cli = &obd->u.cli;
2789         struct obd_type   *type;
2790         void              *handler;
2791         int                rc;
2792         ENTRY;
2793
2794         rc = ptlrpcd_addref();
2795         if (rc)
2796                 RETURN(rc);
2797
2798         rc = client_obd_setup(obd, lcfg);
2799         if (rc)
2800                 GOTO(out_ptlrpcd, rc);
2801
2802         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2803         if (IS_ERR(handler))
2804                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2805         cli->cl_writeback_work = handler;
2806
2807         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2808         if (IS_ERR(handler))
2809                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2810         cli->cl_lru_work = handler;
2811
2812         rc = osc_quota_setup(obd);
2813         if (rc)
2814                 GOTO(out_ptlrpcd_work, rc);
2815
2816         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2817
2818 #ifdef CONFIG_PROC_FS
2819         obd->obd_vars = lprocfs_osc_obd_vars;
2820 #endif
2821         /* If this is true then both client (osc) and server (osp) are on the
2822          * same node. The osp layer if loaded first will register the osc proc
2823          * directory. In that case this obd_device will be attached its proc
2824          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2825         type = class_search_type(LUSTRE_OSP_NAME);
2826         if (type && type->typ_procsym) {
2827                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2828                                                        type->typ_procsym,
2829                                                        obd->obd_vars, obd);
2830                 if (IS_ERR(obd->obd_proc_entry)) {
2831                         rc = PTR_ERR(obd->obd_proc_entry);
2832                         CERROR("error %d setting up lprocfs for %s\n", rc,
2833                                obd->obd_name);
2834                         obd->obd_proc_entry = NULL;
2835                 }
2836         } else {
2837                 rc = lprocfs_obd_setup(obd);
2838         }
2839
2840         /* If the basic OSC proc tree construction succeeded then
2841          * lets do the rest. */
2842         if (rc == 0) {
2843                 lproc_osc_attach_seqstat(obd);
2844                 sptlrpc_lprocfs_cliobd_attach(obd);
2845                 ptlrpc_lprocfs_register_obd(obd);
2846         }
2847
2848         /* We need to allocate a few requests more, because
2849          * brw_interpret tries to create new requests before freeing
2850          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2851          * reserved, but I'm afraid that might be too much wasted RAM
2852          * in fact, so 2 is just my guess and still should work. */
2853         cli->cl_import->imp_rq_pool =
2854                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2855                                     OST_MAXREQSIZE,
2856                                     ptlrpc_add_rqs_to_pool);
2857
2858         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2859         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2860         RETURN(0);
2861
2862 out_ptlrpcd_work:
2863         if (cli->cl_writeback_work != NULL) {
2864                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2865                 cli->cl_writeback_work = NULL;
2866         }
2867         if (cli->cl_lru_work != NULL) {
2868                 ptlrpcd_destroy_work(cli->cl_lru_work);
2869                 cli->cl_lru_work = NULL;
2870         }
2871 out_client_setup:
2872         client_obd_cleanup(obd);
2873 out_ptlrpcd:
2874         ptlrpcd_decref();
2875         RETURN(rc);
2876 }
2877
2878 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2879 {
2880         int rc = 0;
2881         ENTRY;
2882
2883         switch (stage) {
2884         case OBD_CLEANUP_EARLY: {
2885                 struct obd_import *imp;
2886                 imp = obd->u.cli.cl_import;
2887                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2888                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2889                 ptlrpc_deactivate_import(imp);
2890                 spin_lock(&imp->imp_lock);
2891                 imp->imp_pingable = 0;
2892                 spin_unlock(&imp->imp_lock);
2893                 break;
2894         }
2895         case OBD_CLEANUP_EXPORTS: {
2896                 struct client_obd *cli = &obd->u.cli;
2897                 /* LU-464
2898                  * for echo client, export may be on zombie list, wait for
2899                  * zombie thread to cull it, because cli.cl_import will be
2900                  * cleared in client_disconnect_export():
2901                  *   class_export_destroy() -> obd_cleanup() ->
2902                  *   echo_device_free() -> echo_client_cleanup() ->
2903                  *   obd_disconnect() -> osc_disconnect() ->
2904                  *   client_disconnect_export()
2905                  */
2906                 obd_zombie_barrier();
2907                 if (cli->cl_writeback_work) {
2908                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2909                         cli->cl_writeback_work = NULL;
2910                 }
2911                 if (cli->cl_lru_work) {
2912                         ptlrpcd_destroy_work(cli->cl_lru_work);
2913                         cli->cl_lru_work = NULL;
2914                 }
2915                 obd_cleanup_client_import(obd);
2916                 ptlrpc_lprocfs_unregister_obd(obd);
2917                 lprocfs_obd_cleanup(obd);
2918                 break;
2919                 }
2920         }
2921         RETURN(rc);
2922 }
2923
2924 int osc_cleanup(struct obd_device *obd)
2925 {
2926         struct client_obd *cli = &obd->u.cli;
2927         int rc;
2928
2929         ENTRY;
2930
2931         /* lru cleanup */
2932         if (cli->cl_cache != NULL) {
2933                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2934                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2935                 list_del_init(&cli->cl_lru_osc);
2936                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2937                 cli->cl_lru_left = NULL;
2938                 cl_cache_decref(cli->cl_cache);
2939                 cli->cl_cache = NULL;
2940         }
2941
2942         /* free memory of osc quota cache */
2943         osc_quota_cleanup(obd);
2944
2945         rc = client_obd_cleanup(obd);
2946
2947         ptlrpcd_decref();
2948         RETURN(rc);
2949 }
2950
2951 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2952 {
2953         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2954         return rc > 0 ? 0: rc;
2955 }
2956
2957 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2958 {
2959         return osc_process_config_base(obd, buf);
2960 }
2961
2962 static struct obd_ops osc_obd_ops = {
2963         .o_owner                = THIS_MODULE,
2964         .o_setup                = osc_setup,
2965         .o_precleanup           = osc_precleanup,
2966         .o_cleanup              = osc_cleanup,
2967         .o_add_conn             = client_import_add_conn,
2968         .o_del_conn             = client_import_del_conn,
2969         .o_connect              = client_connect_import,
2970         .o_reconnect            = osc_reconnect,
2971         .o_disconnect           = osc_disconnect,
2972         .o_statfs               = osc_statfs,
2973         .o_statfs_async         = osc_statfs_async,
2974         .o_create               = osc_create,
2975         .o_destroy              = osc_destroy,
2976         .o_getattr              = osc_getattr,
2977         .o_getattr_async        = osc_getattr_async,
2978         .o_setattr              = osc_setattr,
2979         .o_setattr_async        = osc_setattr_async,
2980         .o_iocontrol            = osc_iocontrol,
2981         .o_set_info_async       = osc_set_info_async,
2982         .o_import_event         = osc_import_event,
2983         .o_process_config       = osc_process_config,
2984         .o_quotactl             = osc_quotactl,
2985         .o_quotacheck           = osc_quotacheck,
2986 };
2987
2988 static int __init osc_init(void)
2989 {
2990         bool enable_proc = true;
2991         struct obd_type *type;
2992         int rc;
2993         ENTRY;
2994
2995         /* print an address of _any_ initialized kernel symbol from this
2996          * module, to allow debugging with gdb that doesn't support data
2997          * symbols from modules.*/
2998         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2999
3000         rc = lu_kmem_init(osc_caches);
3001         if (rc)
3002                 RETURN(rc);
3003
3004         type = class_search_type(LUSTRE_OSP_NAME);
3005         if (type != NULL && type->typ_procsym != NULL)
3006                 enable_proc = false;
3007
3008         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3009                                  LUSTRE_OSC_NAME, &osc_device_type);
3010         if (rc) {
3011                 lu_kmem_fini(osc_caches);
3012                 RETURN(rc);
3013         }
3014
3015         RETURN(rc);
3016 }
3017
3018 static void /*__exit*/ osc_exit(void)
3019 {
3020         class_unregister_type(LUSTRE_OSC_NAME);
3021         lu_kmem_fini(osc_caches);
3022 }
3023
3024 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3025 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3026 MODULE_LICENSE("GPL");
3027
3028 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);