Whamcloud - gitweb
LU-2550 osc: set resend count properly
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_fsync_args *fa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *fa->fa_oi->oi_oa = body->oa;
585 out:
586         rc = fa->fa_upcall(fa->fa_cookie, rc);
587         RETURN(rc);
588 }
589
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591                   obd_enqueue_update_f upcall, void *cookie,
592                   struct ptlrpc_request_set *rqset)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_fsync_args *fa;
597         int                    rc;
598         ENTRY;
599
600         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601         if (req == NULL)
602                 RETURN(-ENOMEM);
603
604         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         /* overload the size and blocks fields in the oa with start/end */
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615         osc_pack_capa(req, body, oinfo->oi_capa);
616
617         ptlrpc_request_set_replen(req);
618         req->rq_interpret_reply = osc_sync_interpret;
619
620         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621         fa = ptlrpc_req_async_args(req);
622         fa->fa_oi = oinfo;
623         fa->fa_upcall = upcall;
624         fa->fa_cookie = cookie;
625
626         if (rqset == PTLRPCD_SET)
627                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628         else
629                 ptlrpc_set_add_req(rqset, req);
630
631         RETURN (0);
632 }
633
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635                     struct obd_info *oinfo, obd_size start, obd_size end,
636                     struct ptlrpc_request_set *set)
637 {
638         ENTRY;
639
640         if (!oinfo->oi_oa) {
641                 CDEBUG(D_INFO, "oa NULL\n");
642                 RETURN(-EINVAL);
643         }
644
645         oinfo->oi_oa->o_size = start;
646         oinfo->oi_oa->o_blocks = end;
647         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648
649         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
650 }
651
652 /* Find and cancel locally locks matched by @mode in the resource found by
653  * @objid. Found locks are added into @cancel list. Returns the amount of
654  * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656                                    cfs_list_t *cancels,
657                                    ldlm_mode_t mode, int lock_flags)
658 {
659         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660         struct ldlm_res_id res_id;
661         struct ldlm_resource *res;
662         int count;
663         ENTRY;
664
665         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
666          * export) but disabled through procfs (flag in NS).
667          *
668          * This distinguishes from a case when ELC is not supported originally,
669          * when we still want to cancel locks in advance and just cancel them
670          * locally, without sending any RPC. */
671         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
672                 RETURN(0);
673
674         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
675         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
676         if (res == NULL)
677                 RETURN(0);
678
679         LDLM_RESOURCE_ADDREF(res);
680         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
681                                            lock_flags, 0, NULL);
682         LDLM_RESOURCE_DELREF(res);
683         ldlm_resource_putref(res);
684         RETURN(count);
685 }
686
687 static int osc_destroy_interpret(const struct lu_env *env,
688                                  struct ptlrpc_request *req, void *data,
689                                  int rc)
690 {
691         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
692
693         cfs_atomic_dec(&cli->cl_destroy_in_flight);
694         cfs_waitq_signal(&cli->cl_destroy_waitq);
695         return 0;
696 }
697
698 static int osc_can_send_destroy(struct client_obd *cli)
699 {
700         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
701             cli->cl_max_rpcs_in_flight) {
702                 /* The destroy request can be sent */
703                 return 1;
704         }
705         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
706             cli->cl_max_rpcs_in_flight) {
707                 /*
708                  * The counter has been modified between the two atomic
709                  * operations.
710                  */
711                 cfs_waitq_signal(&cli->cl_destroy_waitq);
712         }
713         return 0;
714 }
715
716 int osc_create(const struct lu_env *env, struct obd_export *exp,
717                struct obdo *oa, struct lov_stripe_md **ea,
718                struct obd_trans_info *oti)
719 {
720         int rc = 0;
721         ENTRY;
722
723         LASSERT(oa);
724         LASSERT(ea);
725         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
726
727         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
728             oa->o_flags == OBD_FL_RECREATE_OBJS) {
729                 RETURN(osc_real_create(exp, oa, ea, oti));
730         }
731
732         if (!fid_seq_is_mdt(oa->o_seq))
733                 RETURN(osc_real_create(exp, oa, ea, oti));
734
735         /* we should not get here anymore */
736         LBUG();
737
738         RETURN(rc);
739 }
740
741 /* Destroy requests can be async always on the client, and we don't even really
742  * care about the return code since the client cannot do anything at all about
743  * a destroy failure.
744  * When the MDS is unlinking a filename, it saves the file objects into a
745  * recovery llog, and these object records are cancelled when the OST reports
746  * they were destroyed and sync'd to disk (i.e. transaction committed).
747  * If the client dies, or the OST is down when the object should be destroyed,
748  * the records are not cancelled, and when the OST reconnects to the MDS next,
749  * it will retrieve the llog unlink logs and then sends the log cancellation
750  * cookies to the MDS after committing destroy transactions. */
751 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
752                        struct obdo *oa, struct lov_stripe_md *ea,
753                        struct obd_trans_info *oti, struct obd_export *md_export,
754                        void *capa)
755 {
756         struct client_obd     *cli = &exp->exp_obd->u.cli;
757         struct ptlrpc_request *req;
758         struct ost_body       *body;
759         CFS_LIST_HEAD(cancels);
760         int rc, count;
761         ENTRY;
762
763         if (!oa) {
764                 CDEBUG(D_INFO, "oa NULL\n");
765                 RETURN(-EINVAL);
766         }
767
768         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
769                                         LDLM_FL_DISCARD_DATA);
770
771         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
772         if (req == NULL) {
773                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
774                 RETURN(-ENOMEM);
775         }
776
777         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
778         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
779                                0, &cancels, count);
780         if (rc) {
781                 ptlrpc_request_free(req);
782                 RETURN(rc);
783         }
784
785         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
786         ptlrpc_at_set_req_timeout(req);
787
788         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
789                 oa->o_lcookie = *oti->oti_logcookies;
790         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
791         LASSERT(body);
792         lustre_set_wire_obdo(&body->oa, oa);
793
794         osc_pack_capa(req, body, (struct obd_capa *)capa);
795         ptlrpc_request_set_replen(req);
796
797         /* If osc_destory is for destroying the unlink orphan,
798          * sent from MDT to OST, which should not be blocked here,
799          * because the process might be triggered by ptlrpcd, and
800          * it is not good to block ptlrpcd thread (b=16006)*/
801         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
802                 req->rq_interpret_reply = osc_destroy_interpret;
803                 if (!osc_can_send_destroy(cli)) {
804                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
805                                                           NULL);
806
807                         /*
808                          * Wait until the number of on-going destroy RPCs drops
809                          * under max_rpc_in_flight
810                          */
811                         l_wait_event_exclusive(cli->cl_destroy_waitq,
812                                                osc_can_send_destroy(cli), &lwi);
813                 }
814         }
815
816         /* Do not wait for response */
817         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
818         RETURN(0);
819 }
820
821 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
822                                 long writing_bytes)
823 {
824         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
825
826         LASSERT(!(oa->o_valid & bits));
827
828         oa->o_valid |= bits;
829         client_obd_list_lock(&cli->cl_loi_list_lock);
830         oa->o_dirty = cli->cl_dirty;
831         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
832                      cli->cl_dirty_max)) {
833                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
834                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
835                 oa->o_undirty = 0;
836         } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
837                             cfs_atomic_read(&obd_dirty_transit_pages) >
838                             (long)(obd_max_dirty_pages + 1))) {
839                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
840                  * not covered by a lock thus they may safely race and trip
841                  * this CERROR() unless we add in a small fudge factor (+1). */
842                 CERROR("dirty %d - %d > system dirty_max %d\n",
843                        cfs_atomic_read(&obd_dirty_pages),
844                        cfs_atomic_read(&obd_dirty_transit_pages),
845                        obd_max_dirty_pages);
846                 oa->o_undirty = 0;
847         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
848                 CERROR("dirty %lu - dirty_max %lu too big???\n",
849                        cli->cl_dirty, cli->cl_dirty_max);
850                 oa->o_undirty = 0;
851         } else {
852                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
853                                       CFS_PAGE_SHIFT)*
854                                      (cli->cl_max_rpcs_in_flight + 1);
855                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
856         }
857         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
858         oa->o_dropped = cli->cl_lost_grant;
859         cli->cl_lost_grant = 0;
860         client_obd_list_unlock(&cli->cl_loi_list_lock);
861         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
862                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
863
864 }
865
866 void osc_update_next_shrink(struct client_obd *cli)
867 {
868         cli->cl_next_shrink_grant =
869                 cfs_time_shift(cli->cl_grant_shrink_interval);
870         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
871                cli->cl_next_shrink_grant);
872 }
873
874 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
875 {
876         client_obd_list_lock(&cli->cl_loi_list_lock);
877         cli->cl_avail_grant += grant;
878         client_obd_list_unlock(&cli->cl_loi_list_lock);
879 }
880
881 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
882 {
883         if (body->oa.o_valid & OBD_MD_FLGRANT) {
884                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
885                 __osc_update_grant(cli, body->oa.o_grant);
886         }
887 }
888
889 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
890                               obd_count keylen, void *key, obd_count vallen,
891                               void *val, struct ptlrpc_request_set *set);
892
893 static int osc_shrink_grant_interpret(const struct lu_env *env,
894                                       struct ptlrpc_request *req,
895                                       void *aa, int rc)
896 {
897         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
898         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
899         struct ost_body *body;
900
901         if (rc != 0) {
902                 __osc_update_grant(cli, oa->o_grant);
903                 GOTO(out, rc);
904         }
905
906         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
907         LASSERT(body);
908         osc_update_grant(cli, body);
909 out:
910         OBDO_FREE(oa);
911         return rc;
912 }
913
914 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
915 {
916         client_obd_list_lock(&cli->cl_loi_list_lock);
917         oa->o_grant = cli->cl_avail_grant / 4;
918         cli->cl_avail_grant -= oa->o_grant;
919         client_obd_list_unlock(&cli->cl_loi_list_lock);
920         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
921                 oa->o_valid |= OBD_MD_FLFLAGS;
922                 oa->o_flags = 0;
923         }
924         oa->o_flags |= OBD_FL_SHRINK_GRANT;
925         osc_update_next_shrink(cli);
926 }
927
928 /* Shrink the current grant, either from some large amount to enough for a
929  * full set of in-flight RPCs, or if we have already shrunk to that limit
930  * then to enough for a single RPC.  This avoids keeping more grant than
931  * needed, and avoids shrinking the grant piecemeal. */
932 static int osc_shrink_grant(struct client_obd *cli)
933 {
934         long target = (cli->cl_max_rpcs_in_flight + 1) *
935                       cli->cl_max_pages_per_rpc;
936
937         client_obd_list_lock(&cli->cl_loi_list_lock);
938         if (cli->cl_avail_grant <= target)
939                 target = cli->cl_max_pages_per_rpc;
940         client_obd_list_unlock(&cli->cl_loi_list_lock);
941
942         return osc_shrink_grant_to_target(cli, target);
943 }
944
945 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
946 {
947         int    rc = 0;
948         struct ost_body     *body;
949         ENTRY;
950
951         client_obd_list_lock(&cli->cl_loi_list_lock);
952         /* Don't shrink if we are already above or below the desired limit
953          * We don't want to shrink below a single RPC, as that will negatively
954          * impact block allocation and long-term performance. */
955         if (target < cli->cl_max_pages_per_rpc)
956                 target = cli->cl_max_pages_per_rpc;
957
958         if (target >= cli->cl_avail_grant) {
959                 client_obd_list_unlock(&cli->cl_loi_list_lock);
960                 RETURN(0);
961         }
962         client_obd_list_unlock(&cli->cl_loi_list_lock);
963
964         OBD_ALLOC_PTR(body);
965         if (!body)
966                 RETURN(-ENOMEM);
967
968         osc_announce_cached(cli, &body->oa, 0);
969
970         client_obd_list_lock(&cli->cl_loi_list_lock);
971         body->oa.o_grant = cli->cl_avail_grant - target;
972         cli->cl_avail_grant = target;
973         client_obd_list_unlock(&cli->cl_loi_list_lock);
974         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
975                 body->oa.o_valid |= OBD_MD_FLFLAGS;
976                 body->oa.o_flags = 0;
977         }
978         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
979         osc_update_next_shrink(cli);
980
981         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
982                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
983                                 sizeof(*body), body, NULL);
984         if (rc != 0)
985                 __osc_update_grant(cli, body->oa.o_grant);
986         OBD_FREE_PTR(body);
987         RETURN(rc);
988 }
989
990 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
991 static int osc_should_shrink_grant(struct client_obd *client)
992 {
993         cfs_time_t time = cfs_time_current();
994         cfs_time_t next_shrink = client->cl_next_shrink_grant;
995
996         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
997              OBD_CONNECT_GRANT_SHRINK) == 0)
998                 return 0;
999
1000         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1001                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1002                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1003                         return 1;
1004                 else
1005                         osc_update_next_shrink(client);
1006         }
1007         return 0;
1008 }
1009
1010 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1011 {
1012         struct client_obd *client;
1013
1014         cfs_list_for_each_entry(client, &item->ti_obd_list,
1015                                 cl_grant_shrink_list) {
1016                 if (osc_should_shrink_grant(client))
1017                         osc_shrink_grant(client);
1018         }
1019         return 0;
1020 }
1021
1022 static int osc_add_shrink_grant(struct client_obd *client)
1023 {
1024         int rc;
1025
1026         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1027                                        TIMEOUT_GRANT,
1028                                        osc_grant_shrink_grant_cb, NULL,
1029                                        &client->cl_grant_shrink_list);
1030         if (rc) {
1031                 CERROR("add grant client %s error %d\n",
1032                         client->cl_import->imp_obd->obd_name, rc);
1033                 return rc;
1034         }
1035         CDEBUG(D_CACHE, "add grant client %s \n",
1036                client->cl_import->imp_obd->obd_name);
1037         osc_update_next_shrink(client);
1038         return 0;
1039 }
1040
1041 static int osc_del_shrink_grant(struct client_obd *client)
1042 {
1043         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1044                                          TIMEOUT_GRANT);
1045 }
1046
1047 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1048 {
1049         /*
1050          * ocd_grant is the total grant amount we're expect to hold: if we've
1051          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1052          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1053          *
1054          * race is tolerable here: if we're evicted, but imp_state already
1055          * left EVICTED state, then cl_dirty must be 0 already.
1056          */
1057         client_obd_list_lock(&cli->cl_loi_list_lock);
1058         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1059                 cli->cl_avail_grant = ocd->ocd_grant;
1060         else
1061                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1062
1063         if (cli->cl_avail_grant < 0) {
1064                 CWARN("%s: available grant < 0, the OSS is probably not running"
1065                       " with patch from bug20278 (%ld) \n",
1066                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1067                 /* workaround for 1.6 servers which do not have
1068                  * the patch from bug20278 */
1069                 cli->cl_avail_grant = ocd->ocd_grant;
1070         }
1071
1072         /* determine the appropriate chunk size used by osc_extent. */
1073         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1074         client_obd_list_unlock(&cli->cl_loi_list_lock);
1075
1076         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1077                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1078                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1079
1080         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1081             cfs_list_empty(&cli->cl_grant_shrink_list))
1082                 osc_add_shrink_grant(cli);
1083 }
1084
1085 /* We assume that the reason this OSC got a short read is because it read
1086  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1087  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1088  * this stripe never got written at or beyond this stripe offset yet. */
1089 static void handle_short_read(int nob_read, obd_count page_count,
1090                               struct brw_page **pga)
1091 {
1092         char *ptr;
1093         int i = 0;
1094
1095         /* skip bytes read OK */
1096         while (nob_read > 0) {
1097                 LASSERT (page_count > 0);
1098
1099                 if (pga[i]->count > nob_read) {
1100                         /* EOF inside this page */
1101                         ptr = cfs_kmap(pga[i]->pg) +
1102                                 (pga[i]->off & ~CFS_PAGE_MASK);
1103                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1104                         cfs_kunmap(pga[i]->pg);
1105                         page_count--;
1106                         i++;
1107                         break;
1108                 }
1109
1110                 nob_read -= pga[i]->count;
1111                 page_count--;
1112                 i++;
1113         }
1114
1115         /* zero remaining pages */
1116         while (page_count-- > 0) {
1117                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1118                 memset(ptr, 0, pga[i]->count);
1119                 cfs_kunmap(pga[i]->pg);
1120                 i++;
1121         }
1122 }
1123
1124 static int check_write_rcs(struct ptlrpc_request *req,
1125                            int requested_nob, int niocount,
1126                            obd_count page_count, struct brw_page **pga)
1127 {
1128         int     i;
1129         __u32   *remote_rcs;
1130
1131         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1132                                                   sizeof(*remote_rcs) *
1133                                                   niocount);
1134         if (remote_rcs == NULL) {
1135                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1136                 return(-EPROTO);
1137         }
1138
1139         /* return error if any niobuf was in error */
1140         for (i = 0; i < niocount; i++) {
1141                 if ((int)remote_rcs[i] < 0)
1142                         return(remote_rcs[i]);
1143
1144                 if (remote_rcs[i] != 0) {
1145                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1146                                 i, remote_rcs[i], req);
1147                         return(-EPROTO);
1148                 }
1149         }
1150
1151         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1152                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1153                        req->rq_bulk->bd_nob_transferred, requested_nob);
1154                 return(-EPROTO);
1155         }
1156
1157         return (0);
1158 }
1159
1160 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1161 {
1162         if (p1->flag != p2->flag) {
1163                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1164                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1165
1166                 /* warn if we try to combine flags that we don't know to be
1167                  * safe to combine */
1168                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1169                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170                               "report this at http://bugs.whamcloud.com/\n",
1171                               p1->flag, p2->flag);
1172                 }
1173                 return 0;
1174         }
1175
1176         return (p1->off + p1->count == p2->off);
1177 }
1178
1179 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1180                                    struct brw_page **pga, int opc,
1181                                    cksum_type_t cksum_type)
1182 {
1183         __u32                           cksum;
1184         int                             i = 0;
1185         struct cfs_crypto_hash_desc     *hdesc;
1186         unsigned int                    bufsize;
1187         int                             err;
1188         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1189
1190         LASSERT(pg_count > 0);
1191
1192         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1193         if (IS_ERR(hdesc)) {
1194                 CERROR("Unable to initialize checksum hash %s\n",
1195                        cfs_crypto_hash_name(cfs_alg));
1196                 return PTR_ERR(hdesc);
1197         }
1198
1199         while (nob > 0 && pg_count > 0) {
1200                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1201
1202                 /* corrupt the data before we compute the checksum, to
1203                  * simulate an OST->client data error */
1204                 if (i == 0 && opc == OST_READ &&
1205                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1206                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1207                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1208                         memcpy(ptr + off, "bad1", min(4, nob));
1209                         cfs_kunmap(pga[i]->pg);
1210                 }
1211                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1212                                   pga[i]->off & ~CFS_PAGE_MASK,
1213                                   count);
1214                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1215                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1216
1217                 nob -= pga[i]->count;
1218                 pg_count--;
1219                 i++;
1220         }
1221
1222         bufsize = 4;
1223         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1224
1225         if (err)
1226                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1227
1228         /* For sending we only compute the wrong checksum instead
1229          * of corrupting the data so it is still correct on a redo */
1230         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1231                 cksum++;
1232
1233         return cksum;
1234 }
1235
1236 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1237                                 struct lov_stripe_md *lsm, obd_count page_count,
1238                                 struct brw_page **pga,
1239                                 struct ptlrpc_request **reqp,
1240                                 struct obd_capa *ocapa, int reserve,
1241                                 int resend)
1242 {
1243         struct ptlrpc_request   *req;
1244         struct ptlrpc_bulk_desc *desc;
1245         struct ost_body         *body;
1246         struct obd_ioobj        *ioobj;
1247         struct niobuf_remote    *niobuf;
1248         int niocount, i, requested_nob, opc, rc;
1249         struct osc_brw_async_args *aa;
1250         struct req_capsule      *pill;
1251         struct brw_page *pg_prev;
1252
1253         ENTRY;
1254         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1255                 RETURN(-ENOMEM); /* Recoverable */
1256         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1257                 RETURN(-EINVAL); /* Fatal */
1258
1259         if ((cmd & OBD_BRW_WRITE) != 0) {
1260                 opc = OST_WRITE;
1261                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1262                                                 cli->cl_import->imp_rq_pool,
1263                                                 &RQF_OST_BRW_WRITE);
1264         } else {
1265                 opc = OST_READ;
1266                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1267         }
1268         if (req == NULL)
1269                 RETURN(-ENOMEM);
1270
1271         for (niocount = i = 1; i < page_count; i++) {
1272                 if (!can_merge_pages(pga[i - 1], pga[i]))
1273                         niocount++;
1274         }
1275
1276         pill = &req->rq_pill;
1277         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1278                              sizeof(*ioobj));
1279         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1280                              niocount * sizeof(*niobuf));
1281         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1282
1283         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1284         if (rc) {
1285                 ptlrpc_request_free(req);
1286                 RETURN(rc);
1287         }
1288         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1289         ptlrpc_at_set_req_timeout(req);
1290         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1291          * retry logic */
1292         req->rq_no_retry_einprogress = 1;
1293
1294         if (opc == OST_WRITE)
1295                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1296                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1297         else
1298                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1299                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1300
1301         if (desc == NULL)
1302                 GOTO(out, rc = -ENOMEM);
1303         /* NB request now owns desc and will free it when it gets freed */
1304
1305         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1306         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1307         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1308         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1309
1310         lustre_set_wire_obdo(&body->oa, oa);
1311
1312         obdo_to_ioobj(oa, ioobj);
1313         ioobj->ioo_bufcnt = niocount;
1314         osc_pack_capa(req, body, ocapa);
1315         LASSERT (page_count > 0);
1316         pg_prev = pga[0];
1317         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1318                 struct brw_page *pg = pga[i];
1319                 int poff = pg->off & ~CFS_PAGE_MASK;
1320
1321                 LASSERT(pg->count > 0);
1322                 /* make sure there is no gap in the middle of page array */
1323                 LASSERTF(page_count == 1 ||
1324                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1325                           ergo(i > 0 && i < page_count - 1,
1326                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1327                           ergo(i == page_count - 1, poff == 0)),
1328                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1329                          i, page_count, pg, pg->off, pg->count);
1330 #ifdef __linux__
1331                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1332                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1333                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1334                          i, page_count,
1335                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1336                          pg_prev->pg, page_private(pg_prev->pg),
1337                          pg_prev->pg->index, pg_prev->off);
1338 #else
1339                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1340                          "i %d p_c %u\n", i, page_count);
1341 #endif
1342                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1343                         (pg->flag & OBD_BRW_SRVLOCK));
1344
1345                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1346                 requested_nob += pg->count;
1347
1348                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1349                         niobuf--;
1350                         niobuf->len += pg->count;
1351                 } else {
1352                         niobuf->offset = pg->off;
1353                         niobuf->len    = pg->count;
1354                         niobuf->flags  = pg->flag;
1355                 }
1356                 pg_prev = pg;
1357         }
1358
1359         LASSERTF((void *)(niobuf - niocount) ==
1360                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1361                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1362                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1363
1364         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1365         if (resend) {
1366                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1367                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1368                         body->oa.o_flags = 0;
1369                 }
1370                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1371         }
1372
1373         if (osc_should_shrink_grant(cli))
1374                 osc_shrink_grant_local(cli, &body->oa);
1375
1376         /* size[REQ_REC_OFF] still sizeof (*body) */
1377         if (opc == OST_WRITE) {
1378                 if (cli->cl_checksum &&
1379                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1380                         /* store cl_cksum_type in a local variable since
1381                          * it can be changed via lprocfs */
1382                         cksum_type_t cksum_type = cli->cl_cksum_type;
1383
1384                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1385                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1386                                 body->oa.o_flags = 0;
1387                         }
1388                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1389                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1390                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1391                                                              page_count, pga,
1392                                                              OST_WRITE,
1393                                                              cksum_type);
1394                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1395                                body->oa.o_cksum);
1396                         /* save this in 'oa', too, for later checking */
1397                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398                         oa->o_flags |= cksum_type_pack(cksum_type);
1399                 } else {
1400                         /* clear out the checksum flag, in case this is a
1401                          * resend but cl_checksum is no longer set. b=11238 */
1402                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1403                 }
1404                 oa->o_cksum = body->oa.o_cksum;
1405                 /* 1 RC per niobuf */
1406                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1407                                      sizeof(__u32) * niocount);
1408         } else {
1409                 if (cli->cl_checksum &&
1410                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1411                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1412                                 body->oa.o_flags = 0;
1413                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1414                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1415                 }
1416         }
1417         ptlrpc_request_set_replen(req);
1418
1419         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1420         aa = ptlrpc_req_async_args(req);
1421         aa->aa_oa = oa;
1422         aa->aa_requested_nob = requested_nob;
1423         aa->aa_nio_count = niocount;
1424         aa->aa_page_count = page_count;
1425         aa->aa_resends = 0;
1426         aa->aa_ppga = pga;
1427         aa->aa_cli = cli;
1428         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1429         if (ocapa && reserve)
1430                 aa->aa_ocapa = capa_get(ocapa);
1431
1432         *reqp = req;
1433         RETURN(0);
1434
1435  out:
1436         ptlrpc_req_finished(req);
1437         RETURN(rc);
1438 }
1439
1440 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1441                                 __u32 client_cksum, __u32 server_cksum, int nob,
1442                                 obd_count page_count, struct brw_page **pga,
1443                                 cksum_type_t client_cksum_type)
1444 {
1445         __u32 new_cksum;
1446         char *msg;
1447         cksum_type_t cksum_type;
1448
1449         if (server_cksum == client_cksum) {
1450                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1451                 return 0;
1452         }
1453
1454         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1455                                        oa->o_flags : 0);
1456         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1457                                       cksum_type);
1458
1459         if (cksum_type != client_cksum_type)
1460                 msg = "the server did not use the checksum type specified in "
1461                       "the original request - likely a protocol problem";
1462         else if (new_cksum == server_cksum)
1463                 msg = "changed on the client after we checksummed it - "
1464                       "likely false positive due to mmap IO (bug 11742)";
1465         else if (new_cksum == client_cksum)
1466                 msg = "changed in transit before arrival at OST";
1467         else
1468                 msg = "changed in transit AND doesn't match the original - "
1469                       "likely false positive due to mmap IO (bug 11742)";
1470
1471         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1472                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1473                            msg, libcfs_nid2str(peer->nid),
1474                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1475                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1476                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1477                            oa->o_id,
1478                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1479                            pga[0]->off,
1480                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1481         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1482                "client csum now %x\n", client_cksum, client_cksum_type,
1483                server_cksum, cksum_type, new_cksum);
1484         return 1;
1485 }
1486
1487 /* Note rc enters this function as number of bytes transferred */
1488 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1489 {
1490         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1491         const lnet_process_id_t *peer =
1492                         &req->rq_import->imp_connection->c_peer;
1493         struct client_obd *cli = aa->aa_cli;
1494         struct ost_body *body;
1495         __u32 client_cksum = 0;
1496         ENTRY;
1497
1498         if (rc < 0 && rc != -EDQUOT) {
1499                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1500                 RETURN(rc);
1501         }
1502
1503         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1505         if (body == NULL) {
1506                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1507                 RETURN(-EPROTO);
1508         }
1509
1510         /* set/clear over quota flag for a uid/gid */
1511         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1512             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1513                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1514
1515                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1516                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1517                        body->oa.o_flags);
1518                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1519         }
1520
1521         osc_update_grant(cli, body);
1522
1523         if (rc < 0)
1524                 RETURN(rc);
1525
1526         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1527                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1528
1529         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1530                 if (rc > 0) {
1531                         CERROR("Unexpected +ve rc %d\n", rc);
1532                         RETURN(-EPROTO);
1533                 }
1534                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1535
1536                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1537                         RETURN(-EAGAIN);
1538
1539                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1540                     check_write_checksum(&body->oa, peer, client_cksum,
1541                                          body->oa.o_cksum, aa->aa_requested_nob,
1542                                          aa->aa_page_count, aa->aa_ppga,
1543                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1544                         RETURN(-EAGAIN);
1545
1546                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1547                                      aa->aa_page_count, aa->aa_ppga);
1548                 GOTO(out, rc);
1549         }
1550
1551         /* The rest of this function executes only for OST_READs */
1552
1553         /* if unwrap_bulk failed, return -EAGAIN to retry */
1554         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1555         if (rc < 0)
1556                 GOTO(out, rc = -EAGAIN);
1557
1558         if (rc > aa->aa_requested_nob) {
1559                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1560                        aa->aa_requested_nob);
1561                 RETURN(-EPROTO);
1562         }
1563
1564         if (rc != req->rq_bulk->bd_nob_transferred) {
1565                 CERROR ("Unexpected rc %d (%d transferred)\n",
1566                         rc, req->rq_bulk->bd_nob_transferred);
1567                 return (-EPROTO);
1568         }
1569
1570         if (rc < aa->aa_requested_nob)
1571                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1572
1573         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1574                 static int cksum_counter;
1575                 __u32      server_cksum = body->oa.o_cksum;
1576                 char      *via;
1577                 char      *router;
1578                 cksum_type_t cksum_type;
1579
1580                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1581                                                body->oa.o_flags : 0);
1582                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1583                                                  aa->aa_ppga, OST_READ,
1584                                                  cksum_type);
1585
1586                 if (peer->nid == req->rq_bulk->bd_sender) {
1587                         via = router = "";
1588                 } else {
1589                         via = " via ";
1590                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1591                 }
1592
1593                 if (server_cksum == ~0 && rc > 0) {
1594                         CERROR("Protocol error: server %s set the 'checksum' "
1595                                "bit, but didn't send a checksum.  Not fatal, "
1596                                "but please notify on http://bugs.whamcloud.com/\n",
1597                                libcfs_nid2str(peer->nid));
1598                 } else if (server_cksum != client_cksum) {
1599                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1600                                            "%s%s%s inode "DFID" object "
1601                                            LPU64"/"LPU64" extent "
1602                                            "["LPU64"-"LPU64"]\n",
1603                                            req->rq_import->imp_obd->obd_name,
1604                                            libcfs_nid2str(peer->nid),
1605                                            via, router,
1606                                            body->oa.o_valid & OBD_MD_FLFID ?
1607                                                 body->oa.o_parent_seq : (__u64)0,
1608                                            body->oa.o_valid & OBD_MD_FLFID ?
1609                                                 body->oa.o_parent_oid : 0,
1610                                            body->oa.o_valid & OBD_MD_FLFID ?
1611                                                 body->oa.o_parent_ver : 0,
1612                                            body->oa.o_id,
1613                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1614                                                 body->oa.o_seq : (__u64)0,
1615                                            aa->aa_ppga[0]->off,
1616                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1617                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1618                                                                         1);
1619                         CERROR("client %x, server %x, cksum_type %x\n",
1620                                client_cksum, server_cksum, cksum_type);
1621                         cksum_counter = 0;
1622                         aa->aa_oa->o_cksum = client_cksum;
1623                         rc = -EAGAIN;
1624                 } else {
1625                         cksum_counter++;
1626                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1627                         rc = 0;
1628                 }
1629         } else if (unlikely(client_cksum)) {
1630                 static int cksum_missed;
1631
1632                 cksum_missed++;
1633                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1634                         CERROR("Checksum %u requested from %s but not sent\n",
1635                                cksum_missed, libcfs_nid2str(peer->nid));
1636         } else {
1637                 rc = 0;
1638         }
1639 out:
1640         if (rc >= 0)
1641                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1642
1643         RETURN(rc);
1644 }
1645
1646 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1647                             struct lov_stripe_md *lsm,
1648                             obd_count page_count, struct brw_page **pga,
1649                             struct obd_capa *ocapa)
1650 {
1651         struct ptlrpc_request *req;
1652         int                    rc;
1653         cfs_waitq_t            waitq;
1654         int                    generation, resends = 0;
1655         struct l_wait_info     lwi;
1656
1657         ENTRY;
1658
1659         cfs_waitq_init(&waitq);
1660         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1661
1662 restart_bulk:
1663         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1664                                   page_count, pga, &req, ocapa, 0, resends);
1665         if (rc != 0)
1666                 return (rc);
1667
1668         if (resends) {
1669                 req->rq_generation_set = 1;
1670                 req->rq_import_generation = generation;
1671                 req->rq_sent = cfs_time_current_sec() + resends;
1672         }
1673
1674         rc = ptlrpc_queue_wait(req);
1675
1676         if (rc == -ETIMEDOUT && req->rq_resend) {
1677                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1678                 ptlrpc_req_finished(req);
1679                 goto restart_bulk;
1680         }
1681
1682         rc = osc_brw_fini_request(req, rc);
1683
1684         ptlrpc_req_finished(req);
1685         /* When server return -EINPROGRESS, client should always retry
1686          * regardless of the number of times the bulk was resent already.*/
1687         if (osc_recoverable_error(rc)) {
1688                 resends++;
1689                 if (rc != -EINPROGRESS &&
1690                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1691                         CERROR("%s: too many resend retries for object: "
1692                                ""LPU64":"LPU64", rc = %d.\n",
1693                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1694                         goto out;
1695                 }
1696                 if (generation !=
1697                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1698                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1699                                ""LPU64":"LPU64", rc = %d.\n",
1700                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1701                         goto out;
1702                 }
1703
1704                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1705                                        NULL);
1706                 l_wait_event(waitq, 0, &lwi);
1707
1708                 goto restart_bulk;
1709         }
1710 out:
1711         if (rc == -EAGAIN || rc == -EINPROGRESS)
1712                 rc = -EIO;
1713         RETURN (rc);
1714 }
1715
1716 static int osc_brw_redo_request(struct ptlrpc_request *request,
1717                                 struct osc_brw_async_args *aa, int rc)
1718 {
1719         struct ptlrpc_request *new_req;
1720         struct osc_brw_async_args *new_aa;
1721         struct osc_async_page *oap;
1722         ENTRY;
1723
1724         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1725                   "redo for recoverable error %d", rc);
1726
1727         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1728                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1729                                   aa->aa_cli, aa->aa_oa,
1730                                   NULL /* lsm unused by osc currently */,
1731                                   aa->aa_page_count, aa->aa_ppga,
1732                                   &new_req, aa->aa_ocapa, 0, 1);
1733         if (rc)
1734                 RETURN(rc);
1735
1736         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1737                 if (oap->oap_request != NULL) {
1738                         LASSERTF(request == oap->oap_request,
1739                                  "request %p != oap_request %p\n",
1740                                  request, oap->oap_request);
1741                         if (oap->oap_interrupted) {
1742                                 ptlrpc_req_finished(new_req);
1743                                 RETURN(-EINTR);
1744                         }
1745                 }
1746         }
1747         /* New request takes over pga and oaps from old request.
1748          * Note that copying a list_head doesn't work, need to move it... */
1749         aa->aa_resends++;
1750         new_req->rq_interpret_reply = request->rq_interpret_reply;
1751         new_req->rq_async_args = request->rq_async_args;
1752         /* cap resend delay to the current request timeout, this is similar to
1753          * what ptlrpc does (see after_reply()) */
1754         if (aa->aa_resends > new_req->rq_timeout)
1755                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1756         else
1757                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1758         new_req->rq_generation_set = 1;
1759         new_req->rq_import_generation = request->rq_import_generation;
1760
1761         new_aa = ptlrpc_req_async_args(new_req);
1762
1763         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1764         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1765         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1766         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1767         new_aa->aa_resends = aa->aa_resends;
1768
1769         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1770                 if (oap->oap_request) {
1771                         ptlrpc_req_finished(oap->oap_request);
1772                         oap->oap_request = ptlrpc_request_addref(new_req);
1773                 }
1774         }
1775
1776         new_aa->aa_ocapa = aa->aa_ocapa;
1777         aa->aa_ocapa = NULL;
1778
1779         /* XXX: This code will run into problem if we're going to support
1780          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1781          * and wait for all of them to be finished. We should inherit request
1782          * set from old request. */
1783         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1784
1785         DEBUG_REQ(D_INFO, new_req, "new request");
1786         RETURN(0);
1787 }
1788
1789 /*
1790  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1791  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1792  * fine for our small page arrays and doesn't require allocation.  its an
1793  * insertion sort that swaps elements that are strides apart, shrinking the
1794  * stride down until its '1' and the array is sorted.
1795  */
1796 static void sort_brw_pages(struct brw_page **array, int num)
1797 {
1798         int stride, i, j;
1799         struct brw_page *tmp;
1800
1801         if (num == 1)
1802                 return;
1803         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1804                 ;
1805
1806         do {
1807                 stride /= 3;
1808                 for (i = stride ; i < num ; i++) {
1809                         tmp = array[i];
1810                         j = i;
1811                         while (j >= stride && array[j - stride]->off > tmp->off) {
1812                                 array[j] = array[j - stride];
1813                                 j -= stride;
1814                         }
1815                         array[j] = tmp;
1816                 }
1817         } while (stride > 1);
1818 }
1819
1820 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1821 {
1822         int count = 1;
1823         int offset;
1824         int i = 0;
1825
1826         LASSERT (pages > 0);
1827         offset = pg[i]->off & ~CFS_PAGE_MASK;
1828
1829         for (;;) {
1830                 pages--;
1831                 if (pages == 0)         /* that's all */
1832                         return count;
1833
1834                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1835                         return count;   /* doesn't end on page boundary */
1836
1837                 i++;
1838                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1839                 if (offset != 0)        /* doesn't start on page boundary */
1840                         return count;
1841
1842                 count++;
1843         }
1844 }
1845
1846 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1847 {
1848         struct brw_page **ppga;
1849         int i;
1850
1851         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1852         if (ppga == NULL)
1853                 return NULL;
1854
1855         for (i = 0; i < count; i++)
1856                 ppga[i] = pga + i;
1857         return ppga;
1858 }
1859
1860 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1861 {
1862         LASSERT(ppga != NULL);
1863         OBD_FREE(ppga, sizeof(*ppga) * count);
1864 }
1865
1866 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1867                    obd_count page_count, struct brw_page *pga,
1868                    struct obd_trans_info *oti)
1869 {
1870         struct obdo *saved_oa = NULL;
1871         struct brw_page **ppga, **orig;
1872         struct obd_import *imp = class_exp2cliimp(exp);
1873         struct client_obd *cli;
1874         int rc, page_count_orig;
1875         ENTRY;
1876
1877         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1878         cli = &imp->imp_obd->u.cli;
1879
1880         if (cmd & OBD_BRW_CHECK) {
1881                 /* The caller just wants to know if there's a chance that this
1882                  * I/O can succeed */
1883
1884                 if (imp->imp_invalid)
1885                         RETURN(-EIO);
1886                 RETURN(0);
1887         }
1888
1889         /* test_brw with a failed create can trip this, maybe others. */
1890         LASSERT(cli->cl_max_pages_per_rpc);
1891
1892         rc = 0;
1893
1894         orig = ppga = osc_build_ppga(pga, page_count);
1895         if (ppga == NULL)
1896                 RETURN(-ENOMEM);
1897         page_count_orig = page_count;
1898
1899         sort_brw_pages(ppga, page_count);
1900         while (page_count) {
1901                 obd_count pages_per_brw;
1902
1903                 if (page_count > cli->cl_max_pages_per_rpc)
1904                         pages_per_brw = cli->cl_max_pages_per_rpc;
1905                 else
1906                         pages_per_brw = page_count;
1907
1908                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1909
1910                 if (saved_oa != NULL) {
1911                         /* restore previously saved oa */
1912                         *oinfo->oi_oa = *saved_oa;
1913                 } else if (page_count > pages_per_brw) {
1914                         /* save a copy of oa (brw will clobber it) */
1915                         OBDO_ALLOC(saved_oa);
1916                         if (saved_oa == NULL)
1917                                 GOTO(out, rc = -ENOMEM);
1918                         *saved_oa = *oinfo->oi_oa;
1919                 }
1920
1921                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1922                                       pages_per_brw, ppga, oinfo->oi_capa);
1923
1924                 if (rc != 0)
1925                         break;
1926
1927                 page_count -= pages_per_brw;
1928                 ppga += pages_per_brw;
1929         }
1930
1931 out:
1932         osc_release_ppga(orig, page_count_orig);
1933
1934         if (saved_oa != NULL)
1935                 OBDO_FREE(saved_oa);
1936
1937         RETURN(rc);
1938 }
1939
1940 static int brw_interpret(const struct lu_env *env,
1941                          struct ptlrpc_request *req, void *data, int rc)
1942 {
1943         struct osc_brw_async_args *aa = data;
1944         struct osc_extent *ext;
1945         struct osc_extent *tmp;
1946         struct cl_object  *obj = NULL;
1947         struct client_obd *cli = aa->aa_cli;
1948         ENTRY;
1949
1950         rc = osc_brw_fini_request(req, rc);
1951         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1952         /* When server return -EINPROGRESS, client should always retry
1953          * regardless of the number of times the bulk was resent already. */
1954         if (osc_recoverable_error(rc)) {
1955                 if (req->rq_import_generation !=
1956                     req->rq_import->imp_generation) {
1957                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1958                                ""LPU64":"LPU64", rc = %d.\n",
1959                                req->rq_import->imp_obd->obd_name,
1960                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1961                 } else if (rc == -EINPROGRESS ||
1962                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1963                         rc = osc_brw_redo_request(req, aa, rc);
1964                 } else {
1965                         CERROR("%s: too many resent retries for object: "
1966                                ""LPU64":"LPU64", rc = %d.\n",
1967                                req->rq_import->imp_obd->obd_name,
1968                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1969                 }
1970
1971                 if (rc == 0)
1972                         RETURN(0);
1973                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1974                         rc = -EIO;
1975         }
1976
1977         if (aa->aa_ocapa) {
1978                 capa_put(aa->aa_ocapa);
1979                 aa->aa_ocapa = NULL;
1980         }
1981
1982         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1983                 if (obj == NULL && rc == 0) {
1984                         obj = osc2cl(ext->oe_obj);
1985                         cl_object_get(obj);
1986                 }
1987
1988                 cfs_list_del_init(&ext->oe_link);
1989                 osc_extent_finish(env, ext, 1, rc);
1990         }
1991         LASSERT(cfs_list_empty(&aa->aa_exts));
1992         LASSERT(cfs_list_empty(&aa->aa_oaps));
1993
1994         if (obj != NULL) {
1995                 struct obdo *oa = aa->aa_oa;
1996                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1997                 unsigned long valid = 0;
1998
1999                 LASSERT(rc == 0);
2000                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2001                         attr->cat_blocks = oa->o_blocks;
2002                         valid |= CAT_BLOCKS;
2003                 }
2004                 if (oa->o_valid & OBD_MD_FLMTIME) {
2005                         attr->cat_mtime = oa->o_mtime;
2006                         valid |= CAT_MTIME;
2007                 }
2008                 if (oa->o_valid & OBD_MD_FLATIME) {
2009                         attr->cat_atime = oa->o_atime;
2010                         valid |= CAT_ATIME;
2011                 }
2012                 if (oa->o_valid & OBD_MD_FLCTIME) {
2013                         attr->cat_ctime = oa->o_ctime;
2014                         valid |= CAT_CTIME;
2015                 }
2016                 if (valid != 0) {
2017                         cl_object_attr_lock(obj);
2018                         cl_object_attr_set(env, obj, attr, valid);
2019                         cl_object_attr_unlock(obj);
2020                 }
2021                 cl_object_put(env, obj);
2022         }
2023         OBDO_FREE(aa->aa_oa);
2024
2025         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2026                           req->rq_bulk->bd_nob_transferred);
2027         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2028         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2029
2030         client_obd_list_lock(&cli->cl_loi_list_lock);
2031         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2032          * is called so we know whether to go to sync BRWs or wait for more
2033          * RPCs to complete */
2034         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2035                 cli->cl_w_in_flight--;
2036         else
2037                 cli->cl_r_in_flight--;
2038         osc_wake_cache_waiters(cli);
2039         client_obd_list_unlock(&cli->cl_loi_list_lock);
2040
2041         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2042         RETURN(rc);
2043 }
2044
2045 /**
2046  * Build an RPC by the list of extent @ext_list. The caller must ensure
2047  * that the total pages in this list are NOT over max pages per RPC.
2048  * Extents in the list must be in OES_RPC state.
2049  */
2050 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2051                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2052 {
2053         struct ptlrpc_request *req = NULL;
2054         struct osc_extent *ext;
2055         CFS_LIST_HEAD(rpc_list);
2056         struct brw_page **pga = NULL;
2057         struct osc_brw_async_args *aa = NULL;
2058         struct obdo *oa = NULL;
2059         struct osc_async_page *oap;
2060         struct osc_async_page *tmp;
2061         struct cl_req *clerq = NULL;
2062         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2063         struct ldlm_lock *lock = NULL;
2064         struct cl_req_attr crattr;
2065         obd_off starting_offset = OBD_OBJECT_EOF;
2066         obd_off ending_offset = 0;
2067         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2068
2069         ENTRY;
2070         LASSERT(!cfs_list_empty(ext_list));
2071
2072         /* add pages into rpc_list to build BRW rpc */
2073         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2074                 LASSERT(ext->oe_state == OES_RPC);
2075                 mem_tight |= ext->oe_memalloc;
2076                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2077                         ++page_count;
2078                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2079                         if (starting_offset > oap->oap_obj_off)
2080                                 starting_offset = oap->oap_obj_off;
2081                         else
2082                                 LASSERT(oap->oap_page_off == 0);
2083                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2084                                 ending_offset = oap->oap_obj_off +
2085                                                 oap->oap_count;
2086                         else
2087                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2088                                         CFS_PAGE_SIZE);
2089                 }
2090         }
2091
2092         if (mem_tight)
2093                 mpflag = cfs_memory_pressure_get_and_set();
2094
2095         memset(&crattr, 0, sizeof crattr);
2096         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2097         if (pga == NULL)
2098                 GOTO(out, rc = -ENOMEM);
2099
2100         OBDO_ALLOC(oa);
2101         if (oa == NULL)
2102                 GOTO(out, rc = -ENOMEM);
2103
2104         i = 0;
2105         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2106                 struct cl_page *page = oap2cl_page(oap);
2107                 if (clerq == NULL) {
2108                         clerq = cl_req_alloc(env, page, crt,
2109                                              1 /* only 1-object rpcs for
2110                                                 * now */);
2111                         if (IS_ERR(clerq))
2112                                 GOTO(out, rc = PTR_ERR(clerq));
2113                         lock = oap->oap_ldlm_lock;
2114                 }
2115                 if (mem_tight)
2116                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2117                 pga[i] = &oap->oap_brw_page;
2118                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2119                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2120                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2121                 i++;
2122                 cl_req_page_add(env, clerq, page);
2123         }
2124
2125         /* always get the data for the obdo for the rpc */
2126         LASSERT(clerq != NULL);
2127         crattr.cra_oa = oa;
2128         crattr.cra_capa = NULL;
2129         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2130         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2131         if (lock) {
2132                 oa->o_handle = lock->l_remote_handle;
2133                 oa->o_valid |= OBD_MD_FLHANDLE;
2134         }
2135
2136         rc = cl_req_prep(env, clerq);
2137         if (rc != 0) {
2138                 CERROR("cl_req_prep failed: %d\n", rc);
2139                 GOTO(out, rc);
2140         }
2141
2142         sort_brw_pages(pga, page_count);
2143         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2144                         pga, &req, crattr.cra_capa, 1, 0);
2145         if (rc != 0) {
2146                 CERROR("prep_req failed: %d\n", rc);
2147                 GOTO(out, rc);
2148         }
2149
2150         req->rq_interpret_reply = brw_interpret;
2151         if (mem_tight != 0)
2152                 req->rq_memalloc = 1;
2153
2154         /* Need to update the timestamps after the request is built in case
2155          * we race with setattr (locally or in queue at OST).  If OST gets
2156          * later setattr before earlier BRW (as determined by the request xid),
2157          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2158          * way to do this in a single call.  bug 10150 */
2159         cl_req_attr_set(env, clerq, &crattr,
2160                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2161
2162         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2163
2164         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2165         aa = ptlrpc_req_async_args(req);
2166         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2167         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2168         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2169         cfs_list_splice_init(ext_list, &aa->aa_exts);
2170         aa->aa_clerq = clerq;
2171
2172         /* queued sync pages can be torn down while the pages
2173          * were between the pending list and the rpc */
2174         tmp = NULL;
2175         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2176                 /* only one oap gets a request reference */
2177                 if (tmp == NULL)
2178                         tmp = oap;
2179                 if (oap->oap_interrupted && !req->rq_intr) {
2180                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2181                                         oap, req);
2182                         ptlrpc_mark_interrupted(req);
2183                 }
2184         }
2185         if (tmp != NULL)
2186                 tmp->oap_request = ptlrpc_request_addref(req);
2187
2188         client_obd_list_lock(&cli->cl_loi_list_lock);
2189         starting_offset >>= CFS_PAGE_SHIFT;
2190         if (cmd == OBD_BRW_READ) {
2191                 cli->cl_r_in_flight++;
2192                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2193                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2194                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2195                                       starting_offset + 1);
2196         } else {
2197                 cli->cl_w_in_flight++;
2198                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2199                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2200                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2201                                       starting_offset + 1);
2202         }
2203         client_obd_list_unlock(&cli->cl_loi_list_lock);
2204
2205         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2206                   page_count, aa, cli->cl_r_in_flight,
2207                   cli->cl_w_in_flight);
2208
2209         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2210          * see which CPU/NUMA node the majority of pages were allocated
2211          * on, and try to assign the async RPC to the CPU core
2212          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2213          *
2214          * But on the other hand, we expect that multiple ptlrpcd
2215          * threads and the initial write sponsor can run in parallel,
2216          * especially when data checksum is enabled, which is CPU-bound
2217          * operation and single ptlrpcd thread cannot process in time.
2218          * So more ptlrpcd threads sharing BRW load
2219          * (with PDL_POLICY_ROUND) seems better.
2220          */
2221         ptlrpcd_add_req(req, pol, -1);
2222         rc = 0;
2223         EXIT;
2224
2225 out:
2226         if (mem_tight != 0)
2227                 cfs_memory_pressure_restore(mpflag);
2228
2229         capa_put(crattr.cra_capa);
2230         if (rc != 0) {
2231                 LASSERT(req == NULL);
2232
2233                 if (oa)
2234                         OBDO_FREE(oa);
2235                 if (pga)
2236                         OBD_FREE(pga, sizeof(*pga) * page_count);
2237                 /* this should happen rarely and is pretty bad, it makes the
2238                  * pending list not follow the dirty order */
2239                 while (!cfs_list_empty(ext_list)) {
2240                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2241                                              oe_link);
2242                         cfs_list_del_init(&ext->oe_link);
2243                         osc_extent_finish(env, ext, 0, rc);
2244                 }
2245                 if (clerq && !IS_ERR(clerq))
2246                         cl_req_completion(env, clerq, rc);
2247         }
2248         RETURN(rc);
2249 }
2250
2251 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2252                                         struct ldlm_enqueue_info *einfo)
2253 {
2254         void *data = einfo->ei_cbdata;
2255         int set = 0;
2256
2257         LASSERT(lock != NULL);
2258         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2259         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2260         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2261         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2262
2263         lock_res_and_lock(lock);
2264         spin_lock(&osc_ast_guard);
2265
2266         if (lock->l_ast_data == NULL)
2267                 lock->l_ast_data = data;
2268         if (lock->l_ast_data == data)
2269                 set = 1;
2270
2271         spin_unlock(&osc_ast_guard);
2272         unlock_res_and_lock(lock);
2273
2274         return set;
2275 }
2276
2277 static int osc_set_data_with_check(struct lustre_handle *lockh,
2278                                    struct ldlm_enqueue_info *einfo)
2279 {
2280         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2281         int set = 0;
2282
2283         if (lock != NULL) {
2284                 set = osc_set_lock_data_with_check(lock, einfo);
2285                 LDLM_LOCK_PUT(lock);
2286         } else
2287                 CERROR("lockh %p, data %p - client evicted?\n",
2288                        lockh, einfo->ei_cbdata);
2289         return set;
2290 }
2291
2292 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2293                              ldlm_iterator_t replace, void *data)
2294 {
2295         struct ldlm_res_id res_id;
2296         struct obd_device *obd = class_exp2obd(exp);
2297
2298         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2299         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2300         return 0;
2301 }
2302
2303 /* find any ldlm lock of the inode in osc
2304  * return 0    not find
2305  *        1    find one
2306  *      < 0    error */
2307 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2308                            ldlm_iterator_t replace, void *data)
2309 {
2310         struct ldlm_res_id res_id;
2311         struct obd_device *obd = class_exp2obd(exp);
2312         int rc = 0;
2313
2314         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2315         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2316         if (rc == LDLM_ITER_STOP)
2317                 return(1);
2318         if (rc == LDLM_ITER_CONTINUE)
2319                 return(0);
2320         return(rc);
2321 }
2322
2323 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2324                             obd_enqueue_update_f upcall, void *cookie,
2325                             __u64 *flags, int agl, int rc)
2326 {
2327         int intent = *flags & LDLM_FL_HAS_INTENT;
2328         ENTRY;
2329
2330         if (intent) {
2331                 /* The request was created before ldlm_cli_enqueue call. */
2332                 if (rc == ELDLM_LOCK_ABORTED) {
2333                         struct ldlm_reply *rep;
2334                         rep = req_capsule_server_get(&req->rq_pill,
2335                                                      &RMF_DLM_REP);
2336
2337                         LASSERT(rep != NULL);
2338                         if (rep->lock_policy_res1)
2339                                 rc = rep->lock_policy_res1;
2340                 }
2341         }
2342
2343         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2344             (rc == 0)) {
2345                 *flags |= LDLM_FL_LVB_READY;
2346                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2347                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2348         }
2349
2350         /* Call the update callback. */
2351         rc = (*upcall)(cookie, rc);
2352         RETURN(rc);
2353 }
2354
2355 static int osc_enqueue_interpret(const struct lu_env *env,
2356                                  struct ptlrpc_request *req,
2357                                  struct osc_enqueue_args *aa, int rc)
2358 {
2359         struct ldlm_lock *lock;
2360         struct lustre_handle handle;
2361         __u32 mode;
2362         struct ost_lvb *lvb;
2363         __u32 lvb_len;
2364         __u64 *flags = aa->oa_flags;
2365
2366         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2367          * might be freed anytime after lock upcall has been called. */
2368         lustre_handle_copy(&handle, aa->oa_lockh);
2369         mode = aa->oa_ei->ei_mode;
2370
2371         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2372          * be valid. */
2373         lock = ldlm_handle2lock(&handle);
2374
2375         /* Take an additional reference so that a blocking AST that
2376          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2377          * to arrive after an upcall has been executed by
2378          * osc_enqueue_fini(). */
2379         ldlm_lock_addref(&handle, mode);
2380
2381         /* Let CP AST to grant the lock first. */
2382         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2383
2384         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2385                 lvb = NULL;
2386                 lvb_len = 0;
2387         } else {
2388                 lvb = aa->oa_lvb;
2389                 lvb_len = sizeof(*aa->oa_lvb);
2390         }
2391
2392         /* Complete obtaining the lock procedure. */
2393         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2394                                    mode, flags, lvb, lvb_len, &handle, rc);
2395         /* Complete osc stuff. */
2396         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2397                               flags, aa->oa_agl, rc);
2398
2399         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2400
2401         /* Release the lock for async request. */
2402         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2403                 /*
2404                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2405                  * not already released by
2406                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2407                  */
2408                 ldlm_lock_decref(&handle, mode);
2409
2410         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2411                  aa->oa_lockh, req, aa);
2412         ldlm_lock_decref(&handle, mode);
2413         LDLM_LOCK_PUT(lock);
2414         return rc;
2415 }
2416
2417 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2418                         struct lov_oinfo *loi, int flags,
2419                         struct ost_lvb *lvb, __u32 mode, int rc)
2420 {
2421         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2422
2423         if (rc == ELDLM_OK) {
2424                 __u64 tmp;
2425
2426                 LASSERT(lock != NULL);
2427                 loi->loi_lvb = *lvb;
2428                 tmp = loi->loi_lvb.lvb_size;
2429                 /* Extend KMS up to the end of this lock and no further
2430                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2431                 if (tmp > lock->l_policy_data.l_extent.end)
2432                         tmp = lock->l_policy_data.l_extent.end + 1;
2433                 if (tmp >= loi->loi_kms) {
2434                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2435                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2436                         loi_kms_set(loi, tmp);
2437                 } else {
2438                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2439                                    LPU64"; leaving kms="LPU64", end="LPU64,
2440                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2441                                    lock->l_policy_data.l_extent.end);
2442                 }
2443                 ldlm_lock_allow_match(lock);
2444         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2445                 LASSERT(lock != NULL);
2446                 loi->loi_lvb = *lvb;
2447                 ldlm_lock_allow_match(lock);
2448                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2449                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2450                 rc = ELDLM_OK;
2451         }
2452
2453         if (lock != NULL) {
2454                 if (rc != ELDLM_OK)
2455                         ldlm_lock_fail_match(lock);
2456
2457                 LDLM_LOCK_PUT(lock);
2458         }
2459 }
2460 EXPORT_SYMBOL(osc_update_enqueue);
2461
2462 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2463
2464 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2465  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2466  * other synchronous requests, however keeping some locks and trying to obtain
2467  * others may take a considerable amount of time in a case of ost failure; and
2468  * when other sync requests do not get released lock from a client, the client
2469  * is excluded from the cluster -- such scenarious make the life difficult, so
2470  * release locks just after they are obtained. */
2471 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2472                      __u64 *flags, ldlm_policy_data_t *policy,
2473                      struct ost_lvb *lvb, int kms_valid,
2474                      obd_enqueue_update_f upcall, void *cookie,
2475                      struct ldlm_enqueue_info *einfo,
2476                      struct lustre_handle *lockh,
2477                      struct ptlrpc_request_set *rqset, int async, int agl)
2478 {
2479         struct obd_device *obd = exp->exp_obd;
2480         struct ptlrpc_request *req = NULL;
2481         int intent = *flags & LDLM_FL_HAS_INTENT;
2482         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2483         ldlm_mode_t mode;
2484         int rc;
2485         ENTRY;
2486
2487         /* Filesystem lock extents are extended to page boundaries so that
2488          * dealing with the page cache is a little smoother.  */
2489         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2490         policy->l_extent.end |= ~CFS_PAGE_MASK;
2491
2492         /*
2493          * kms is not valid when either object is completely fresh (so that no
2494          * locks are cached), or object was evicted. In the latter case cached
2495          * lock cannot be used, because it would prime inode state with
2496          * potentially stale LVB.
2497          */
2498         if (!kms_valid)
2499                 goto no_match;
2500
2501         /* Next, search for already existing extent locks that will cover us */
2502         /* If we're trying to read, we also search for an existing PW lock.  The
2503          * VFS and page cache already protect us locally, so lots of readers/
2504          * writers can share a single PW lock.
2505          *
2506          * There are problems with conversion deadlocks, so instead of
2507          * converting a read lock to a write lock, we'll just enqueue a new
2508          * one.
2509          *
2510          * At some point we should cancel the read lock instead of making them
2511          * send us a blocking callback, but there are problems with canceling
2512          * locks out from other users right now, too. */
2513         mode = einfo->ei_mode;
2514         if (einfo->ei_mode == LCK_PR)
2515                 mode |= LCK_PW;
2516         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2517                                einfo->ei_type, policy, mode, lockh, 0);
2518         if (mode) {
2519                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2520
2521                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2522                         /* For AGL, if enqueue RPC is sent but the lock is not
2523                          * granted, then skip to process this strpe.
2524                          * Return -ECANCELED to tell the caller. */
2525                         ldlm_lock_decref(lockh, mode);
2526                         LDLM_LOCK_PUT(matched);
2527                         RETURN(-ECANCELED);
2528                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2529                         *flags |= LDLM_FL_LVB_READY;
2530                         /* addref the lock only if not async requests and PW
2531                          * lock is matched whereas we asked for PR. */
2532                         if (!rqset && einfo->ei_mode != mode)
2533                                 ldlm_lock_addref(lockh, LCK_PR);
2534                         if (intent) {
2535                                 /* I would like to be able to ASSERT here that
2536                                  * rss <= kms, but I can't, for reasons which
2537                                  * are explained in lov_enqueue() */
2538                         }
2539
2540                         /* We already have a lock, and it's referenced.
2541                          *
2542                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2543                          * AGL upcall may change it to CLS_HELD directly. */
2544                         (*upcall)(cookie, ELDLM_OK);
2545
2546                         if (einfo->ei_mode != mode)
2547                                 ldlm_lock_decref(lockh, LCK_PW);
2548                         else if (rqset)
2549                                 /* For async requests, decref the lock. */
2550                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2551                         LDLM_LOCK_PUT(matched);
2552                         RETURN(ELDLM_OK);
2553                 } else {
2554                         ldlm_lock_decref(lockh, mode);
2555                         LDLM_LOCK_PUT(matched);
2556                 }
2557         }
2558
2559  no_match:
2560         if (intent) {
2561                 CFS_LIST_HEAD(cancels);
2562                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2563                                            &RQF_LDLM_ENQUEUE_LVB);
2564                 if (req == NULL)
2565                         RETURN(-ENOMEM);
2566
2567                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2568                 if (rc) {
2569                         ptlrpc_request_free(req);
2570                         RETURN(rc);
2571                 }
2572
2573                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2574                                      sizeof *lvb);
2575                 ptlrpc_request_set_replen(req);
2576         }
2577
2578         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2579         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2580
2581         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2582                               sizeof(*lvb), LVB_T_OST, lockh, async);
2583         if (rqset) {
2584                 if (!rc) {
2585                         struct osc_enqueue_args *aa;
2586                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2587                         aa = ptlrpc_req_async_args(req);
2588                         aa->oa_ei = einfo;
2589                         aa->oa_exp = exp;
2590                         aa->oa_flags  = flags;
2591                         aa->oa_upcall = upcall;
2592                         aa->oa_cookie = cookie;
2593                         aa->oa_lvb    = lvb;
2594                         aa->oa_lockh  = lockh;
2595                         aa->oa_agl    = !!agl;
2596
2597                         req->rq_interpret_reply =
2598                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2599                         if (rqset == PTLRPCD_SET)
2600                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2601                         else
2602                                 ptlrpc_set_add_req(rqset, req);
2603                 } else if (intent) {
2604                         ptlrpc_req_finished(req);
2605                 }
2606                 RETURN(rc);
2607         }
2608
2609         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2610         if (intent)
2611                 ptlrpc_req_finished(req);
2612
2613         RETURN(rc);
2614 }
2615
2616 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2617                        struct ldlm_enqueue_info *einfo,
2618                        struct ptlrpc_request_set *rqset)
2619 {
2620         struct ldlm_res_id res_id;
2621         int rc;
2622         ENTRY;
2623
2624         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2625                            oinfo->oi_md->lsm_object_seq, &res_id);
2626
2627         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2628                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2629                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2630                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2631                               rqset, rqset != NULL, 0);
2632         RETURN(rc);
2633 }
2634
2635 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2636                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2637                    int *flags, void *data, struct lustre_handle *lockh,
2638                    int unref)
2639 {
2640         struct obd_device *obd = exp->exp_obd;
2641         int lflags = *flags;
2642         ldlm_mode_t rc;
2643         ENTRY;
2644
2645         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2646                 RETURN(-EIO);
2647
2648         /* Filesystem lock extents are extended to page boundaries so that
2649          * dealing with the page cache is a little smoother */
2650         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2651         policy->l_extent.end |= ~CFS_PAGE_MASK;
2652
2653         /* Next, search for already existing extent locks that will cover us */
2654         /* If we're trying to read, we also search for an existing PW lock.  The
2655          * VFS and page cache already protect us locally, so lots of readers/
2656          * writers can share a single PW lock. */
2657         rc = mode;
2658         if (mode == LCK_PR)
2659                 rc |= LCK_PW;
2660         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2661                              res_id, type, policy, rc, lockh, unref);
2662         if (rc) {
2663                 if (data != NULL) {
2664                         if (!osc_set_data_with_check(lockh, data)) {
2665                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2666                                         ldlm_lock_decref(lockh, rc);
2667                                 RETURN(0);
2668                         }
2669                 }
2670                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2671                         ldlm_lock_addref(lockh, LCK_PR);
2672                         ldlm_lock_decref(lockh, LCK_PW);
2673                 }
2674                 RETURN(rc);
2675         }
2676         RETURN(rc);
2677 }
2678
2679 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2680 {
2681         ENTRY;
2682
2683         if (unlikely(mode == LCK_GROUP))
2684                 ldlm_lock_decref_and_cancel(lockh, mode);
2685         else
2686                 ldlm_lock_decref(lockh, mode);
2687
2688         RETURN(0);
2689 }
2690
2691 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2692                       __u32 mode, struct lustre_handle *lockh)
2693 {
2694         ENTRY;
2695         RETURN(osc_cancel_base(lockh, mode));
2696 }
2697
2698 static int osc_cancel_unused(struct obd_export *exp,
2699                              struct lov_stripe_md *lsm,
2700                              ldlm_cancel_flags_t flags,
2701                              void *opaque)
2702 {
2703         struct obd_device *obd = class_exp2obd(exp);
2704         struct ldlm_res_id res_id, *resp = NULL;
2705
2706         if (lsm != NULL) {
2707                 resp = osc_build_res_name(lsm->lsm_object_id,
2708                                           lsm->lsm_object_seq, &res_id);
2709         }
2710
2711         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2712 }
2713
2714 static int osc_statfs_interpret(const struct lu_env *env,
2715                                 struct ptlrpc_request *req,
2716                                 struct osc_async_args *aa, int rc)
2717 {
2718         struct obd_statfs *msfs;
2719         ENTRY;
2720
2721         if (rc == -EBADR)
2722                 /* The request has in fact never been sent
2723                  * due to issues at a higher level (LOV).
2724                  * Exit immediately since the caller is
2725                  * aware of the problem and takes care
2726                  * of the clean up */
2727                  RETURN(rc);
2728
2729         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2730             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2731                 GOTO(out, rc = 0);
2732
2733         if (rc != 0)
2734                 GOTO(out, rc);
2735
2736         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2737         if (msfs == NULL) {
2738                 GOTO(out, rc = -EPROTO);
2739         }
2740
2741         *aa->aa_oi->oi_osfs = *msfs;
2742 out:
2743         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2744         RETURN(rc);
2745 }
2746
2747 static int osc_statfs_async(struct obd_export *exp,
2748                             struct obd_info *oinfo, __u64 max_age,
2749                             struct ptlrpc_request_set *rqset)
2750 {
2751         struct obd_device     *obd = class_exp2obd(exp);
2752         struct ptlrpc_request *req;
2753         struct osc_async_args *aa;
2754         int                    rc;
2755         ENTRY;
2756
2757         /* We could possibly pass max_age in the request (as an absolute
2758          * timestamp or a "seconds.usec ago") so the target can avoid doing
2759          * extra calls into the filesystem if that isn't necessary (e.g.
2760          * during mount that would help a bit).  Having relative timestamps
2761          * is not so great if request processing is slow, while absolute
2762          * timestamps are not ideal because they need time synchronization. */
2763         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2764         if (req == NULL)
2765                 RETURN(-ENOMEM);
2766
2767         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2768         if (rc) {
2769                 ptlrpc_request_free(req);
2770                 RETURN(rc);
2771         }
2772         ptlrpc_request_set_replen(req);
2773         req->rq_request_portal = OST_CREATE_PORTAL;
2774         ptlrpc_at_set_req_timeout(req);
2775
2776         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2777                 /* procfs requests not want stat in wait for avoid deadlock */
2778                 req->rq_no_resend = 1;
2779                 req->rq_no_delay = 1;
2780         }
2781
2782         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2783         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2784         aa = ptlrpc_req_async_args(req);
2785         aa->aa_oi = oinfo;
2786
2787         ptlrpc_set_add_req(rqset, req);
2788         RETURN(0);
2789 }
2790
2791 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2792                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2793 {
2794         struct obd_device     *obd = class_exp2obd(exp);
2795         struct obd_statfs     *msfs;
2796         struct ptlrpc_request *req;
2797         struct obd_import     *imp = NULL;
2798         int rc;
2799         ENTRY;
2800
2801         /*Since the request might also come from lprocfs, so we need
2802          *sync this with client_disconnect_export Bug15684*/
2803         down_read(&obd->u.cli.cl_sem);
2804         if (obd->u.cli.cl_import)
2805                 imp = class_import_get(obd->u.cli.cl_import);
2806         up_read(&obd->u.cli.cl_sem);
2807         if (!imp)
2808                 RETURN(-ENODEV);
2809
2810         /* We could possibly pass max_age in the request (as an absolute
2811          * timestamp or a "seconds.usec ago") so the target can avoid doing
2812          * extra calls into the filesystem if that isn't necessary (e.g.
2813          * during mount that would help a bit).  Having relative timestamps
2814          * is not so great if request processing is slow, while absolute
2815          * timestamps are not ideal because they need time synchronization. */
2816         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2817
2818         class_import_put(imp);
2819
2820         if (req == NULL)
2821                 RETURN(-ENOMEM);
2822
2823         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2824         if (rc) {
2825                 ptlrpc_request_free(req);
2826                 RETURN(rc);
2827         }
2828         ptlrpc_request_set_replen(req);
2829         req->rq_request_portal = OST_CREATE_PORTAL;
2830         ptlrpc_at_set_req_timeout(req);
2831
2832         if (flags & OBD_STATFS_NODELAY) {
2833                 /* procfs requests not want stat in wait for avoid deadlock */
2834                 req->rq_no_resend = 1;
2835                 req->rq_no_delay = 1;
2836         }
2837
2838         rc = ptlrpc_queue_wait(req);
2839         if (rc)
2840                 GOTO(out, rc);
2841
2842         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2843         if (msfs == NULL) {
2844                 GOTO(out, rc = -EPROTO);
2845         }
2846
2847         *osfs = *msfs;
2848
2849         EXIT;
2850  out:
2851         ptlrpc_req_finished(req);
2852         return rc;
2853 }
2854
2855 /* Retrieve object striping information.
2856  *
2857  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2858  * the maximum number of OST indices which will fit in the user buffer.
2859  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2860  */
2861 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2862 {
2863         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2864         struct lov_user_md_v3 lum, *lumk;
2865         struct lov_user_ost_data_v1 *lmm_objects;
2866         int rc = 0, lum_size;
2867         ENTRY;
2868
2869         if (!lsm)
2870                 RETURN(-ENODATA);
2871
2872         /* we only need the header part from user space to get lmm_magic and
2873          * lmm_stripe_count, (the header part is common to v1 and v3) */
2874         lum_size = sizeof(struct lov_user_md_v1);
2875         if (cfs_copy_from_user(&lum, lump, lum_size))
2876                 RETURN(-EFAULT);
2877
2878         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2879             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2880                 RETURN(-EINVAL);
2881
2882         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2883         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2884         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2885         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2886
2887         /* we can use lov_mds_md_size() to compute lum_size
2888          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2889         if (lum.lmm_stripe_count > 0) {
2890                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2891                 OBD_ALLOC(lumk, lum_size);
2892                 if (!lumk)
2893                         RETURN(-ENOMEM);
2894
2895                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2896                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2897                 else
2898                         lmm_objects = &(lumk->lmm_objects[0]);
2899                 lmm_objects->l_object_id = lsm->lsm_object_id;
2900         } else {
2901                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2902                 lumk = &lum;
2903         }
2904
2905         lumk->lmm_object_id = lsm->lsm_object_id;
2906         lumk->lmm_object_seq = lsm->lsm_object_seq;
2907         lumk->lmm_stripe_count = 1;
2908
2909         if (cfs_copy_to_user(lump, lumk, lum_size))
2910                 rc = -EFAULT;
2911
2912         if (lumk != &lum)
2913                 OBD_FREE(lumk, lum_size);
2914
2915         RETURN(rc);
2916 }
2917
2918
2919 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2920                          void *karg, void *uarg)
2921 {
2922         struct obd_device *obd = exp->exp_obd;
2923         struct obd_ioctl_data *data = karg;
2924         int err = 0;
2925         ENTRY;
2926
2927         if (!cfs_try_module_get(THIS_MODULE)) {
2928                 CERROR("Can't get module. Is it alive?");
2929                 return -EINVAL;
2930         }
2931         switch (cmd) {
2932         case OBD_IOC_LOV_GET_CONFIG: {
2933                 char *buf;
2934                 struct lov_desc *desc;
2935                 struct obd_uuid uuid;
2936
2937                 buf = NULL;
2938                 len = 0;
2939                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2940                         GOTO(out, err = -EINVAL);
2941
2942                 data = (struct obd_ioctl_data *)buf;
2943
2944                 if (sizeof(*desc) > data->ioc_inllen1) {
2945                         obd_ioctl_freedata(buf, len);
2946                         GOTO(out, err = -EINVAL);
2947                 }
2948
2949                 if (data->ioc_inllen2 < sizeof(uuid)) {
2950                         obd_ioctl_freedata(buf, len);
2951                         GOTO(out, err = -EINVAL);
2952                 }
2953
2954                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2955                 desc->ld_tgt_count = 1;
2956                 desc->ld_active_tgt_count = 1;
2957                 desc->ld_default_stripe_count = 1;
2958                 desc->ld_default_stripe_size = 0;
2959                 desc->ld_default_stripe_offset = 0;
2960                 desc->ld_pattern = 0;
2961                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2962
2963                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2964
2965                 err = cfs_copy_to_user((void *)uarg, buf, len);
2966                 if (err)
2967                         err = -EFAULT;
2968                 obd_ioctl_freedata(buf, len);
2969                 GOTO(out, err);
2970         }
2971         case LL_IOC_LOV_SETSTRIPE:
2972                 err = obd_alloc_memmd(exp, karg);
2973                 if (err > 0)
2974                         err = 0;
2975                 GOTO(out, err);
2976         case LL_IOC_LOV_GETSTRIPE:
2977                 err = osc_getstripe(karg, uarg);
2978                 GOTO(out, err);
2979         case OBD_IOC_CLIENT_RECOVER:
2980                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2981                                             data->ioc_inlbuf1, 0);
2982                 if (err > 0)
2983                         err = 0;
2984                 GOTO(out, err);
2985         case IOC_OSC_SET_ACTIVE:
2986                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2987                                                data->ioc_offset);
2988                 GOTO(out, err);
2989         case OBD_IOC_POLL_QUOTACHECK:
2990                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2991                 GOTO(out, err);
2992         case OBD_IOC_PING_TARGET:
2993                 err = ptlrpc_obd_ping(obd);
2994                 GOTO(out, err);
2995         default:
2996                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2997                        cmd, cfs_curproc_comm());
2998                 GOTO(out, err = -ENOTTY);
2999         }
3000 out:
3001         cfs_module_put(THIS_MODULE);
3002         return err;
3003 }
3004
3005 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3006                         obd_count keylen, void *key, __u32 *vallen, void *val,
3007                         struct lov_stripe_md *lsm)
3008 {
3009         ENTRY;
3010         if (!vallen || !val)
3011                 RETURN(-EFAULT);
3012
3013         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3014                 __u32 *stripe = val;
3015                 *vallen = sizeof(*stripe);
3016                 *stripe = 0;
3017                 RETURN(0);
3018         } else if (KEY_IS(KEY_LAST_ID)) {
3019                 struct ptlrpc_request *req;
3020                 obd_id                *reply;
3021                 char                  *tmp;
3022                 int                    rc;
3023
3024                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3025                                            &RQF_OST_GET_INFO_LAST_ID);
3026                 if (req == NULL)
3027                         RETURN(-ENOMEM);
3028
3029                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3030                                      RCL_CLIENT, keylen);
3031                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3032                 if (rc) {
3033                         ptlrpc_request_free(req);
3034                         RETURN(rc);
3035                 }
3036
3037                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3038                 memcpy(tmp, key, keylen);
3039
3040                 req->rq_no_delay = req->rq_no_resend = 1;
3041                 ptlrpc_request_set_replen(req);
3042                 rc = ptlrpc_queue_wait(req);
3043                 if (rc)
3044                         GOTO(out, rc);
3045
3046                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3047                 if (reply == NULL)
3048                         GOTO(out, rc = -EPROTO);
3049
3050                 *((obd_id *)val) = *reply;
3051         out:
3052                 ptlrpc_req_finished(req);
3053                 RETURN(rc);
3054         } else if (KEY_IS(KEY_FIEMAP)) {
3055                 struct ptlrpc_request *req;
3056                 struct ll_user_fiemap *reply;
3057                 char *tmp;
3058                 int rc;
3059
3060                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3061                                            &RQF_OST_GET_INFO_FIEMAP);
3062                 if (req == NULL)
3063                         RETURN(-ENOMEM);
3064
3065                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3066                                      RCL_CLIENT, keylen);
3067                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3068                                      RCL_CLIENT, *vallen);
3069                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3070                                      RCL_SERVER, *vallen);
3071
3072                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3073                 if (rc) {
3074                         ptlrpc_request_free(req);
3075                         RETURN(rc);
3076                 }
3077
3078                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3079                 memcpy(tmp, key, keylen);
3080                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3081                 memcpy(tmp, val, *vallen);
3082
3083                 ptlrpc_request_set_replen(req);
3084                 rc = ptlrpc_queue_wait(req);
3085                 if (rc)
3086                         GOTO(out1, rc);
3087
3088                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3089                 if (reply == NULL)
3090                         GOTO(out1, rc = -EPROTO);
3091
3092                 memcpy(val, reply, *vallen);
3093         out1:
3094                 ptlrpc_req_finished(req);
3095
3096                 RETURN(rc);
3097         }
3098
3099         RETURN(-EINVAL);
3100 }
3101
3102 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3103                               obd_count keylen, void *key, obd_count vallen,
3104                               void *val, struct ptlrpc_request_set *set)
3105 {
3106         struct ptlrpc_request *req;
3107         struct obd_device     *obd = exp->exp_obd;
3108         struct obd_import     *imp = class_exp2cliimp(exp);
3109         char                  *tmp;
3110         int                    rc;
3111         ENTRY;
3112
3113         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3114
3115         if (KEY_IS(KEY_CHECKSUM)) {
3116                 if (vallen != sizeof(int))
3117                         RETURN(-EINVAL);
3118                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3119                 RETURN(0);
3120         }
3121
3122         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3123                 sptlrpc_conf_client_adapt(obd);
3124                 RETURN(0);
3125         }
3126
3127         if (KEY_IS(KEY_FLUSH_CTX)) {
3128                 sptlrpc_import_flush_my_ctx(imp);
3129                 RETURN(0);
3130         }
3131
3132         if (KEY_IS(KEY_CACHE_SET)) {
3133                 struct client_obd *cli = &obd->u.cli;
3134
3135                 LASSERT(cli->cl_cache == NULL); /* only once */
3136                 cli->cl_cache = (struct cl_client_cache *)val;
3137                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3138                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3139
3140                 /* add this osc into entity list */
3141                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3142                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3143                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3144                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3145
3146                 RETURN(0);
3147         }
3148
3149         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3150                 struct client_obd *cli = &obd->u.cli;
3151                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3152                 int target = *(int *)val;
3153
3154                 nr = osc_lru_shrink(cli, min(nr, target));
3155                 *(int *)val -= nr;
3156                 RETURN(0);
3157         }
3158
3159         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3160                 RETURN(-EINVAL);
3161
3162         /* We pass all other commands directly to OST. Since nobody calls osc
3163            methods directly and everybody is supposed to go through LOV, we
3164            assume lov checked invalid values for us.
3165            The only recognised values so far are evict_by_nid and mds_conn.
3166            Even if something bad goes through, we'd get a -EINVAL from OST
3167            anyway. */
3168
3169         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3170                                                 &RQF_OST_SET_GRANT_INFO :
3171                                                 &RQF_OBD_SET_INFO);
3172         if (req == NULL)
3173                 RETURN(-ENOMEM);
3174
3175         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3176                              RCL_CLIENT, keylen);
3177         if (!KEY_IS(KEY_GRANT_SHRINK))
3178                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3179                                      RCL_CLIENT, vallen);
3180         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3181         if (rc) {
3182                 ptlrpc_request_free(req);
3183                 RETURN(rc);
3184         }
3185
3186         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3187         memcpy(tmp, key, keylen);
3188         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3189                                                         &RMF_OST_BODY :
3190                                                         &RMF_SETINFO_VAL);
3191         memcpy(tmp, val, vallen);
3192
3193         if (KEY_IS(KEY_GRANT_SHRINK)) {
3194                 struct osc_grant_args *aa;
3195                 struct obdo *oa;
3196
3197                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3198                 aa = ptlrpc_req_async_args(req);
3199                 OBDO_ALLOC(oa);
3200                 if (!oa) {
3201                         ptlrpc_req_finished(req);
3202                         RETURN(-ENOMEM);
3203                 }
3204                 *oa = ((struct ost_body *)val)->oa;
3205                 aa->aa_oa = oa;
3206                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3207         }
3208
3209         ptlrpc_request_set_replen(req);
3210         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3211                 LASSERT(set != NULL);
3212                 ptlrpc_set_add_req(set, req);
3213                 ptlrpc_check_set(NULL, set);
3214         } else
3215                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3216
3217         RETURN(0);
3218 }
3219
3220
3221 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3222                          struct obd_device *disk_obd, int *index)
3223 {
3224         /* this code is not supposed to be used with LOD/OSP
3225          * to be removed soon */
3226         LBUG();
3227         return 0;
3228 }
3229
3230 static int osc_llog_finish(struct obd_device *obd, int count)
3231 {
3232         struct llog_ctxt *ctxt;
3233
3234         ENTRY;
3235
3236         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3237         if (ctxt) {
3238                 llog_cat_close(NULL, ctxt->loc_handle);
3239                 llog_cleanup(NULL, ctxt);
3240         }
3241
3242         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3243         if (ctxt)
3244                 llog_cleanup(NULL, ctxt);
3245         RETURN(0);
3246 }
3247
3248 static int osc_reconnect(const struct lu_env *env,
3249                          struct obd_export *exp, struct obd_device *obd,
3250                          struct obd_uuid *cluuid,
3251                          struct obd_connect_data *data,
3252                          void *localdata)
3253 {
3254         struct client_obd *cli = &obd->u.cli;
3255
3256         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3257                 long lost_grant;
3258
3259                 client_obd_list_lock(&cli->cl_loi_list_lock);
3260                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3261                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3262                 lost_grant = cli->cl_lost_grant;
3263                 cli->cl_lost_grant = 0;
3264                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3265
3266                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3267                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3268                        data->ocd_version, data->ocd_grant, lost_grant);
3269         }
3270
3271         RETURN(0);
3272 }
3273
3274 static int osc_disconnect(struct obd_export *exp)
3275 {
3276         struct obd_device *obd = class_exp2obd(exp);
3277         struct llog_ctxt  *ctxt;
3278         int rc;
3279
3280         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3281         if (ctxt) {
3282                 if (obd->u.cli.cl_conn_count == 1) {
3283                         /* Flush any remaining cancel messages out to the
3284                          * target */
3285                         llog_sync(ctxt, exp, 0);
3286                 }
3287                 llog_ctxt_put(ctxt);
3288         } else {
3289                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3290                        obd);
3291         }
3292
3293         rc = client_disconnect_export(exp);
3294         /**
3295          * Initially we put del_shrink_grant before disconnect_export, but it
3296          * causes the following problem if setup (connect) and cleanup
3297          * (disconnect) are tangled together.
3298          *      connect p1                     disconnect p2
3299          *   ptlrpc_connect_import
3300          *     ...............               class_manual_cleanup
3301          *                                     osc_disconnect
3302          *                                     del_shrink_grant
3303          *   ptlrpc_connect_interrupt
3304          *     init_grant_shrink
3305          *   add this client to shrink list
3306          *                                      cleanup_osc
3307          * Bang! pinger trigger the shrink.
3308          * So the osc should be disconnected from the shrink list, after we
3309          * are sure the import has been destroyed. BUG18662
3310          */
3311         if (obd->u.cli.cl_import == NULL)
3312                 osc_del_shrink_grant(&obd->u.cli);
3313         return rc;
3314 }
3315
3316 static int osc_import_event(struct obd_device *obd,
3317                             struct obd_import *imp,
3318                             enum obd_import_event event)
3319 {
3320         struct client_obd *cli;
3321         int rc = 0;
3322
3323         ENTRY;
3324         LASSERT(imp->imp_obd == obd);
3325
3326         switch (event) {
3327         case IMP_EVENT_DISCON: {
3328                 cli = &obd->u.cli;
3329                 client_obd_list_lock(&cli->cl_loi_list_lock);
3330                 cli->cl_avail_grant = 0;
3331                 cli->cl_lost_grant = 0;
3332                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3333                 break;
3334         }
3335         case IMP_EVENT_INACTIVE: {
3336                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3337                 break;
3338         }
3339         case IMP_EVENT_INVALIDATE: {
3340                 struct ldlm_namespace *ns = obd->obd_namespace;
3341                 struct lu_env         *env;
3342                 int                    refcheck;
3343
3344                 env = cl_env_get(&refcheck);
3345                 if (!IS_ERR(env)) {
3346                         /* Reset grants */
3347                         cli = &obd->u.cli;
3348                         /* all pages go to failing rpcs due to the invalid
3349                          * import */
3350                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3351
3352                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3353                         cl_env_put(env, &refcheck);
3354                 } else
3355                         rc = PTR_ERR(env);
3356                 break;
3357         }
3358         case IMP_EVENT_ACTIVE: {
3359                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3360                 break;
3361         }
3362         case IMP_EVENT_OCD: {
3363                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3364
3365                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3366                         osc_init_grant(&obd->u.cli, ocd);
3367
3368                 /* See bug 7198 */
3369                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3370                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3371
3372                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3373                 break;
3374         }
3375         case IMP_EVENT_DEACTIVATE: {
3376                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3377                 break;
3378         }
3379         case IMP_EVENT_ACTIVATE: {
3380                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3381                 break;
3382         }
3383         default:
3384                 CERROR("Unknown import event %d\n", event);
3385                 LBUG();
3386         }
3387         RETURN(rc);
3388 }
3389
3390 /**
3391  * Determine whether the lock can be canceled before replaying the lock
3392  * during recovery, see bug16774 for detailed information.
3393  *
3394  * \retval zero the lock can't be canceled
3395  * \retval other ok to cancel
3396  */
3397 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3398 {
3399         check_res_locked(lock->l_resource);
3400
3401         /*
3402          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3403          *
3404          * XXX as a future improvement, we can also cancel unused write lock
3405          * if it doesn't have dirty data and active mmaps.
3406          */
3407         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3408             (lock->l_granted_mode == LCK_PR ||
3409              lock->l_granted_mode == LCK_CR) &&
3410             (osc_dlm_lock_pageref(lock) == 0))
3411                 RETURN(1);
3412
3413         RETURN(0);
3414 }
3415
3416 static int brw_queue_work(const struct lu_env *env, void *data)
3417 {
3418         struct client_obd *cli = data;
3419
3420         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3421
3422         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3423         RETURN(0);
3424 }
3425
3426 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3427 {
3428         struct lprocfs_static_vars lvars = { 0 };
3429         struct client_obd          *cli = &obd->u.cli;
3430         void                       *handler;
3431         int                        rc;
3432         ENTRY;
3433
3434         rc = ptlrpcd_addref();
3435         if (rc)
3436                 RETURN(rc);
3437
3438         rc = client_obd_setup(obd, lcfg);
3439         if (rc)
3440                 GOTO(out_ptlrpcd, rc);
3441
3442         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3443         if (IS_ERR(handler))
3444                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3445         cli->cl_writeback_work = handler;
3446
3447         rc = osc_quota_setup(obd);
3448         if (rc)
3449                 GOTO(out_ptlrpcd_work, rc);
3450
3451         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3452         lprocfs_osc_init_vars(&lvars);
3453         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3454                 lproc_osc_attach_seqstat(obd);
3455                 sptlrpc_lprocfs_cliobd_attach(obd);
3456                 ptlrpc_lprocfs_register_obd(obd);
3457         }
3458
3459         /* We need to allocate a few requests more, because
3460          * brw_interpret tries to create new requests before freeing
3461          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3462          * reserved, but I'm afraid that might be too much wasted RAM
3463          * in fact, so 2 is just my guess and still should work. */
3464         cli->cl_import->imp_rq_pool =
3465                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3466                                     OST_MAXREQSIZE,
3467                                     ptlrpc_add_rqs_to_pool);
3468
3469         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3470         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3471         RETURN(rc);
3472
3473 out_ptlrpcd_work:
3474         ptlrpcd_destroy_work(handler);
3475 out_client_setup:
3476         client_obd_cleanup(obd);
3477 out_ptlrpcd:
3478         ptlrpcd_decref();
3479         RETURN(rc);
3480 }
3481
3482 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3483 {
3484         int rc = 0;
3485         ENTRY;
3486
3487         switch (stage) {
3488         case OBD_CLEANUP_EARLY: {
3489                 struct obd_import *imp;
3490                 imp = obd->u.cli.cl_import;
3491                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3492                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3493                 ptlrpc_deactivate_import(imp);
3494                 spin_lock(&imp->imp_lock);
3495                 imp->imp_pingable = 0;
3496                 spin_unlock(&imp->imp_lock);
3497                 break;
3498         }
3499         case OBD_CLEANUP_EXPORTS: {
3500                 struct client_obd *cli = &obd->u.cli;
3501                 /* LU-464
3502                  * for echo client, export may be on zombie list, wait for
3503                  * zombie thread to cull it, because cli.cl_import will be
3504                  * cleared in client_disconnect_export():
3505                  *   class_export_destroy() -> obd_cleanup() ->
3506                  *   echo_device_free() -> echo_client_cleanup() ->
3507                  *   obd_disconnect() -> osc_disconnect() ->
3508                  *   client_disconnect_export()
3509                  */
3510                 obd_zombie_barrier();
3511                 if (cli->cl_writeback_work) {
3512                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3513                         cli->cl_writeback_work = NULL;
3514                 }
3515                 obd_cleanup_client_import(obd);
3516                 ptlrpc_lprocfs_unregister_obd(obd);
3517                 lprocfs_obd_cleanup(obd);
3518                 rc = obd_llog_finish(obd, 0);
3519                 if (rc != 0)
3520                         CERROR("failed to cleanup llogging subsystems\n");
3521                 break;
3522                 }
3523         }
3524         RETURN(rc);
3525 }
3526
3527 int osc_cleanup(struct obd_device *obd)
3528 {
3529         struct client_obd *cli = &obd->u.cli;
3530         int rc;
3531
3532         ENTRY;
3533
3534         /* lru cleanup */
3535         if (cli->cl_cache != NULL) {
3536                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3537                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3538                 cfs_list_del_init(&cli->cl_lru_osc);
3539                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3540                 cli->cl_lru_left = NULL;
3541                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3542                 cli->cl_cache = NULL;
3543         }
3544
3545         /* free memory of osc quota cache */
3546         osc_quota_cleanup(obd);
3547
3548         rc = client_obd_cleanup(obd);
3549
3550         ptlrpcd_decref();
3551         RETURN(rc);
3552 }
3553
3554 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3555 {
3556         struct lprocfs_static_vars lvars = { 0 };
3557         int rc = 0;
3558
3559         lprocfs_osc_init_vars(&lvars);
3560
3561         switch (lcfg->lcfg_command) {
3562         default:
3563                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3564                                               lcfg, obd);
3565                 if (rc > 0)
3566                         rc = 0;
3567                 break;
3568         }
3569
3570         return(rc);
3571 }
3572
3573 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3574 {
3575         return osc_process_config_base(obd, buf);
3576 }
3577
3578 struct obd_ops osc_obd_ops = {
3579         .o_owner                = THIS_MODULE,
3580         .o_setup                = osc_setup,
3581         .o_precleanup           = osc_precleanup,
3582         .o_cleanup              = osc_cleanup,
3583         .o_add_conn             = client_import_add_conn,
3584         .o_del_conn             = client_import_del_conn,
3585         .o_connect              = client_connect_import,
3586         .o_reconnect            = osc_reconnect,
3587         .o_disconnect           = osc_disconnect,
3588         .o_statfs               = osc_statfs,
3589         .o_statfs_async         = osc_statfs_async,
3590         .o_packmd               = osc_packmd,
3591         .o_unpackmd             = osc_unpackmd,
3592         .o_create               = osc_create,
3593         .o_destroy              = osc_destroy,
3594         .o_getattr              = osc_getattr,
3595         .o_getattr_async        = osc_getattr_async,
3596         .o_setattr              = osc_setattr,
3597         .o_setattr_async        = osc_setattr_async,
3598         .o_brw                  = osc_brw,
3599         .o_punch                = osc_punch,
3600         .o_sync                 = osc_sync,
3601         .o_enqueue              = osc_enqueue,
3602         .o_change_cbdata        = osc_change_cbdata,
3603         .o_find_cbdata          = osc_find_cbdata,
3604         .o_cancel               = osc_cancel,
3605         .o_cancel_unused        = osc_cancel_unused,
3606         .o_iocontrol            = osc_iocontrol,
3607         .o_get_info             = osc_get_info,
3608         .o_set_info_async       = osc_set_info_async,
3609         .o_import_event         = osc_import_event,
3610         .o_llog_init            = osc_llog_init,
3611         .o_llog_finish          = osc_llog_finish,
3612         .o_process_config       = osc_process_config,
3613         .o_quotactl             = osc_quotactl,
3614         .o_quotacheck           = osc_quotacheck,
3615 };
3616
3617 extern struct lu_kmem_descr osc_caches[];
3618 extern spinlock_t osc_ast_guard;
3619 extern struct lock_class_key osc_ast_guard_class;
3620
3621 int __init osc_init(void)
3622 {
3623         struct lprocfs_static_vars lvars = { 0 };
3624         int rc;
3625         ENTRY;
3626
3627         /* print an address of _any_ initialized kernel symbol from this
3628          * module, to allow debugging with gdb that doesn't support data
3629          * symbols from modules.*/
3630         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3631
3632         rc = lu_kmem_init(osc_caches);
3633
3634         lprocfs_osc_init_vars(&lvars);
3635
3636         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3637                                  LUSTRE_OSC_NAME, &osc_device_type);
3638         if (rc) {
3639                 lu_kmem_fini(osc_caches);
3640                 RETURN(rc);
3641         }
3642
3643         spin_lock_init(&osc_ast_guard);
3644         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3645
3646         RETURN(rc);
3647 }
3648
3649 #ifdef __KERNEL__
3650 static void /*__exit*/ osc_exit(void)
3651 {
3652         class_unregister_type(LUSTRE_OSC_NAME);
3653         lu_kmem_fini(osc_caches);
3654 }
3655
3656 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3657 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3658 MODULE_LICENSE("GPL");
3659
3660 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3661 #endif