Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_fsync_args *fa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *fa->fa_oi->oi_oa = body->oa;
585 out:
586         rc = fa->fa_upcall(fa->fa_cookie, rc);
587         RETURN(rc);
588 }
589
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591                   obd_enqueue_update_f upcall, void *cookie,
592                   struct ptlrpc_request_set *rqset)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_fsync_args *fa;
597         int                    rc;
598         ENTRY;
599
600         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601         if (req == NULL)
602                 RETURN(-ENOMEM);
603
604         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         /* overload the size and blocks fields in the oa with start/end */
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615         osc_pack_capa(req, body, oinfo->oi_capa);
616
617         ptlrpc_request_set_replen(req);
618         req->rq_interpret_reply = osc_sync_interpret;
619
620         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621         fa = ptlrpc_req_async_args(req);
622         fa->fa_oi = oinfo;
623         fa->fa_upcall = upcall;
624         fa->fa_cookie = cookie;
625
626         if (rqset == PTLRPCD_SET)
627                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628         else
629                 ptlrpc_set_add_req(rqset, req);
630
631         RETURN (0);
632 }
633
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635                     struct obd_info *oinfo, obd_size start, obd_size end,
636                     struct ptlrpc_request_set *set)
637 {
638         ENTRY;
639
640         if (!oinfo->oi_oa) {
641                 CDEBUG(D_INFO, "oa NULL\n");
642                 RETURN(-EINVAL);
643         }
644
645         oinfo->oi_oa->o_size = start;
646         oinfo->oi_oa->o_blocks = end;
647         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648
649         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
650 }
651
652 /* Find and cancel locally locks matched by @mode in the resource found by
653  * @objid. Found locks are added into @cancel list. Returns the amount of
654  * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656                                    cfs_list_t *cancels,
657                                    ldlm_mode_t mode, int lock_flags)
658 {
659         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660         struct ldlm_res_id res_id;
661         struct ldlm_resource *res;
662         int count;
663         ENTRY;
664
665         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
666          * export) but disabled through procfs (flag in NS).
667          *
668          * This distinguishes from a case when ELC is not supported originally,
669          * when we still want to cancel locks in advance and just cancel them
670          * locally, without sending any RPC. */
671         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
672                 RETURN(0);
673
674         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
675         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
676         if (res == NULL)
677                 RETURN(0);
678
679         LDLM_RESOURCE_ADDREF(res);
680         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
681                                            lock_flags, 0, NULL);
682         LDLM_RESOURCE_DELREF(res);
683         ldlm_resource_putref(res);
684         RETURN(count);
685 }
686
687 static int osc_destroy_interpret(const struct lu_env *env,
688                                  struct ptlrpc_request *req, void *data,
689                                  int rc)
690 {
691         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
692
693         cfs_atomic_dec(&cli->cl_destroy_in_flight);
694         cfs_waitq_signal(&cli->cl_destroy_waitq);
695         return 0;
696 }
697
698 static int osc_can_send_destroy(struct client_obd *cli)
699 {
700         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
701             cli->cl_max_rpcs_in_flight) {
702                 /* The destroy request can be sent */
703                 return 1;
704         }
705         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
706             cli->cl_max_rpcs_in_flight) {
707                 /*
708                  * The counter has been modified between the two atomic
709                  * operations.
710                  */
711                 cfs_waitq_signal(&cli->cl_destroy_waitq);
712         }
713         return 0;
714 }
715
716 int osc_create(const struct lu_env *env, struct obd_export *exp,
717                struct obdo *oa, struct lov_stripe_md **ea,
718                struct obd_trans_info *oti)
719 {
720         int rc = 0;
721         ENTRY;
722
723         LASSERT(oa);
724         LASSERT(ea);
725         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
726
727         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
728             oa->o_flags == OBD_FL_RECREATE_OBJS) {
729                 RETURN(osc_real_create(exp, oa, ea, oti));
730         }
731
732         if (!fid_seq_is_mdt(oa->o_seq))
733                 RETURN(osc_real_create(exp, oa, ea, oti));
734
735         /* we should not get here anymore */
736         LBUG();
737
738         RETURN(rc);
739 }
740
741 /* Destroy requests can be async always on the client, and we don't even really
742  * care about the return code since the client cannot do anything at all about
743  * a destroy failure.
744  * When the MDS is unlinking a filename, it saves the file objects into a
745  * recovery llog, and these object records are cancelled when the OST reports
746  * they were destroyed and sync'd to disk (i.e. transaction committed).
747  * If the client dies, or the OST is down when the object should be destroyed,
748  * the records are not cancelled, and when the OST reconnects to the MDS next,
749  * it will retrieve the llog unlink logs and then sends the log cancellation
750  * cookies to the MDS after committing destroy transactions. */
751 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
752                        struct obdo *oa, struct lov_stripe_md *ea,
753                        struct obd_trans_info *oti, struct obd_export *md_export,
754                        void *capa)
755 {
756         struct client_obd     *cli = &exp->exp_obd->u.cli;
757         struct ptlrpc_request *req;
758         struct ost_body       *body;
759         CFS_LIST_HEAD(cancels);
760         int rc, count;
761         ENTRY;
762
763         if (!oa) {
764                 CDEBUG(D_INFO, "oa NULL\n");
765                 RETURN(-EINVAL);
766         }
767
768         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
769                                         LDLM_FL_DISCARD_DATA);
770
771         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
772         if (req == NULL) {
773                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
774                 RETURN(-ENOMEM);
775         }
776
777         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
778         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
779                                0, &cancels, count);
780         if (rc) {
781                 ptlrpc_request_free(req);
782                 RETURN(rc);
783         }
784
785         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
786         ptlrpc_at_set_req_timeout(req);
787
788         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
789                 oa->o_lcookie = *oti->oti_logcookies;
790         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
791         LASSERT(body);
792         lustre_set_wire_obdo(&body->oa, oa);
793
794         osc_pack_capa(req, body, (struct obd_capa *)capa);
795         ptlrpc_request_set_replen(req);
796
797         /* If osc_destory is for destroying the unlink orphan,
798          * sent from MDT to OST, which should not be blocked here,
799          * because the process might be triggered by ptlrpcd, and
800          * it is not good to block ptlrpcd thread (b=16006)*/
801         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
802                 req->rq_interpret_reply = osc_destroy_interpret;
803                 if (!osc_can_send_destroy(cli)) {
804                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
805                                                           NULL);
806
807                         /*
808                          * Wait until the number of on-going destroy RPCs drops
809                          * under max_rpc_in_flight
810                          */
811                         l_wait_event_exclusive(cli->cl_destroy_waitq,
812                                                osc_can_send_destroy(cli), &lwi);
813                 }
814         }
815
816         /* Do not wait for response */
817         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
818         RETURN(0);
819 }
820
821 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
822                                 long writing_bytes)
823 {
824         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
825
826         LASSERT(!(oa->o_valid & bits));
827
828         oa->o_valid |= bits;
829         client_obd_list_lock(&cli->cl_loi_list_lock);
830         oa->o_dirty = cli->cl_dirty;
831         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
832                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
833                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
834                 oa->o_undirty = 0;
835         } else if (cfs_atomic_read(&obd_dirty_pages) -
836                    cfs_atomic_read(&obd_dirty_transit_pages) >
837                    obd_max_dirty_pages + 1){
838                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
839                  * not covered by a lock thus they may safely race and trip
840                  * this CERROR() unless we add in a small fudge factor (+1). */
841                 CERROR("dirty %d - %d > system dirty_max %d\n",
842                        cfs_atomic_read(&obd_dirty_pages),
843                        cfs_atomic_read(&obd_dirty_transit_pages),
844                        obd_max_dirty_pages);
845                 oa->o_undirty = 0;
846         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
847                 CERROR("dirty %lu - dirty_max %lu too big???\n",
848                        cli->cl_dirty, cli->cl_dirty_max);
849                 oa->o_undirty = 0;
850         } else {
851                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
852                                 (cli->cl_max_rpcs_in_flight + 1);
853                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
854         }
855         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
856         oa->o_dropped = cli->cl_lost_grant;
857         cli->cl_lost_grant = 0;
858         client_obd_list_unlock(&cli->cl_loi_list_lock);
859         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
860                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
861
862 }
863
864 void osc_update_next_shrink(struct client_obd *cli)
865 {
866         cli->cl_next_shrink_grant =
867                 cfs_time_shift(cli->cl_grant_shrink_interval);
868         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
869                cli->cl_next_shrink_grant);
870 }
871
872 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
873 {
874         client_obd_list_lock(&cli->cl_loi_list_lock);
875         cli->cl_avail_grant += grant;
876         client_obd_list_unlock(&cli->cl_loi_list_lock);
877 }
878
879 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
880 {
881         if (body->oa.o_valid & OBD_MD_FLGRANT) {
882                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
883                 __osc_update_grant(cli, body->oa.o_grant);
884         }
885 }
886
887 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
888                               obd_count keylen, void *key, obd_count vallen,
889                               void *val, struct ptlrpc_request_set *set);
890
891 static int osc_shrink_grant_interpret(const struct lu_env *env,
892                                       struct ptlrpc_request *req,
893                                       void *aa, int rc)
894 {
895         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
896         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
897         struct ost_body *body;
898
899         if (rc != 0) {
900                 __osc_update_grant(cli, oa->o_grant);
901                 GOTO(out, rc);
902         }
903
904         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
905         LASSERT(body);
906         osc_update_grant(cli, body);
907 out:
908         OBDO_FREE(oa);
909         return rc;
910 }
911
912 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
913 {
914         client_obd_list_lock(&cli->cl_loi_list_lock);
915         oa->o_grant = cli->cl_avail_grant / 4;
916         cli->cl_avail_grant -= oa->o_grant;
917         client_obd_list_unlock(&cli->cl_loi_list_lock);
918         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
919                 oa->o_valid |= OBD_MD_FLFLAGS;
920                 oa->o_flags = 0;
921         }
922         oa->o_flags |= OBD_FL_SHRINK_GRANT;
923         osc_update_next_shrink(cli);
924 }
925
926 /* Shrink the current grant, either from some large amount to enough for a
927  * full set of in-flight RPCs, or if we have already shrunk to that limit
928  * then to enough for a single RPC.  This avoids keeping more grant than
929  * needed, and avoids shrinking the grant piecemeal. */
930 static int osc_shrink_grant(struct client_obd *cli)
931 {
932         long target = (cli->cl_max_rpcs_in_flight + 1) *
933                       cli->cl_max_pages_per_rpc;
934
935         client_obd_list_lock(&cli->cl_loi_list_lock);
936         if (cli->cl_avail_grant <= target)
937                 target = cli->cl_max_pages_per_rpc;
938         client_obd_list_unlock(&cli->cl_loi_list_lock);
939
940         return osc_shrink_grant_to_target(cli, target);
941 }
942
943 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
944 {
945         int    rc = 0;
946         struct ost_body     *body;
947         ENTRY;
948
949         client_obd_list_lock(&cli->cl_loi_list_lock);
950         /* Don't shrink if we are already above or below the desired limit
951          * We don't want to shrink below a single RPC, as that will negatively
952          * impact block allocation and long-term performance. */
953         if (target < cli->cl_max_pages_per_rpc)
954                 target = cli->cl_max_pages_per_rpc;
955
956         if (target >= cli->cl_avail_grant) {
957                 client_obd_list_unlock(&cli->cl_loi_list_lock);
958                 RETURN(0);
959         }
960         client_obd_list_unlock(&cli->cl_loi_list_lock);
961
962         OBD_ALLOC_PTR(body);
963         if (!body)
964                 RETURN(-ENOMEM);
965
966         osc_announce_cached(cli, &body->oa, 0);
967
968         client_obd_list_lock(&cli->cl_loi_list_lock);
969         body->oa.o_grant = cli->cl_avail_grant - target;
970         cli->cl_avail_grant = target;
971         client_obd_list_unlock(&cli->cl_loi_list_lock);
972         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
973                 body->oa.o_valid |= OBD_MD_FLFLAGS;
974                 body->oa.o_flags = 0;
975         }
976         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
977         osc_update_next_shrink(cli);
978
979         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
980                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
981                                 sizeof(*body), body, NULL);
982         if (rc != 0)
983                 __osc_update_grant(cli, body->oa.o_grant);
984         OBD_FREE_PTR(body);
985         RETURN(rc);
986 }
987
988 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
989 static int osc_should_shrink_grant(struct client_obd *client)
990 {
991         cfs_time_t time = cfs_time_current();
992         cfs_time_t next_shrink = client->cl_next_shrink_grant;
993
994         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
995              OBD_CONNECT_GRANT_SHRINK) == 0)
996                 return 0;
997
998         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
999                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1000                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1001                         return 1;
1002                 else
1003                         osc_update_next_shrink(client);
1004         }
1005         return 0;
1006 }
1007
1008 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1009 {
1010         struct client_obd *client;
1011
1012         cfs_list_for_each_entry(client, &item->ti_obd_list,
1013                                 cl_grant_shrink_list) {
1014                 if (osc_should_shrink_grant(client))
1015                         osc_shrink_grant(client);
1016         }
1017         return 0;
1018 }
1019
1020 static int osc_add_shrink_grant(struct client_obd *client)
1021 {
1022         int rc;
1023
1024         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1025                                        TIMEOUT_GRANT,
1026                                        osc_grant_shrink_grant_cb, NULL,
1027                                        &client->cl_grant_shrink_list);
1028         if (rc) {
1029                 CERROR("add grant client %s error %d\n",
1030                         client->cl_import->imp_obd->obd_name, rc);
1031                 return rc;
1032         }
1033         CDEBUG(D_CACHE, "add grant client %s \n",
1034                client->cl_import->imp_obd->obd_name);
1035         osc_update_next_shrink(client);
1036         return 0;
1037 }
1038
1039 static int osc_del_shrink_grant(struct client_obd *client)
1040 {
1041         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1042                                          TIMEOUT_GRANT);
1043 }
1044
1045 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1046 {
1047         /*
1048          * ocd_grant is the total grant amount we're expect to hold: if we've
1049          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1050          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1051          *
1052          * race is tolerable here: if we're evicted, but imp_state already
1053          * left EVICTED state, then cl_dirty must be 0 already.
1054          */
1055         client_obd_list_lock(&cli->cl_loi_list_lock);
1056         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1057                 cli->cl_avail_grant = ocd->ocd_grant;
1058         else
1059                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1060
1061         if (cli->cl_avail_grant < 0) {
1062                 CWARN("%s: available grant < 0, the OSS is probably not running"
1063                       " with patch from bug20278 (%ld) \n",
1064                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1065                 /* workaround for 1.6 servers which do not have
1066                  * the patch from bug20278 */
1067                 cli->cl_avail_grant = ocd->ocd_grant;
1068         }
1069
1070         /* determine the appropriate chunk size used by osc_extent. */
1071         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1072         client_obd_list_unlock(&cli->cl_loi_list_lock);
1073
1074         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1075                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1076                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1077
1078         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1079             cfs_list_empty(&cli->cl_grant_shrink_list))
1080                 osc_add_shrink_grant(cli);
1081 }
1082
1083 /* We assume that the reason this OSC got a short read is because it read
1084  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1085  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1086  * this stripe never got written at or beyond this stripe offset yet. */
1087 static void handle_short_read(int nob_read, obd_count page_count,
1088                               struct brw_page **pga)
1089 {
1090         char *ptr;
1091         int i = 0;
1092
1093         /* skip bytes read OK */
1094         while (nob_read > 0) {
1095                 LASSERT (page_count > 0);
1096
1097                 if (pga[i]->count > nob_read) {
1098                         /* EOF inside this page */
1099                         ptr = cfs_kmap(pga[i]->pg) +
1100                                 (pga[i]->off & ~CFS_PAGE_MASK);
1101                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1102                         cfs_kunmap(pga[i]->pg);
1103                         page_count--;
1104                         i++;
1105                         break;
1106                 }
1107
1108                 nob_read -= pga[i]->count;
1109                 page_count--;
1110                 i++;
1111         }
1112
1113         /* zero remaining pages */
1114         while (page_count-- > 0) {
1115                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1116                 memset(ptr, 0, pga[i]->count);
1117                 cfs_kunmap(pga[i]->pg);
1118                 i++;
1119         }
1120 }
1121
1122 static int check_write_rcs(struct ptlrpc_request *req,
1123                            int requested_nob, int niocount,
1124                            obd_count page_count, struct brw_page **pga)
1125 {
1126         int     i;
1127         __u32   *remote_rcs;
1128
1129         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1130                                                   sizeof(*remote_rcs) *
1131                                                   niocount);
1132         if (remote_rcs == NULL) {
1133                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1134                 return(-EPROTO);
1135         }
1136
1137         /* return error if any niobuf was in error */
1138         for (i = 0; i < niocount; i++) {
1139                 if ((int)remote_rcs[i] < 0)
1140                         return(remote_rcs[i]);
1141
1142                 if (remote_rcs[i] != 0) {
1143                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1144                                 i, remote_rcs[i], req);
1145                         return(-EPROTO);
1146                 }
1147         }
1148
1149         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1150                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1151                        req->rq_bulk->bd_nob_transferred, requested_nob);
1152                 return(-EPROTO);
1153         }
1154
1155         return (0);
1156 }
1157
1158 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1159 {
1160         if (p1->flag != p2->flag) {
1161                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1162                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1163
1164                 /* warn if we try to combine flags that we don't know to be
1165                  * safe to combine */
1166                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1167                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1168                               "report this at http://bugs.whamcloud.com/\n",
1169                               p1->flag, p2->flag);
1170                 }
1171                 return 0;
1172         }
1173
1174         return (p1->off + p1->count == p2->off);
1175 }
1176
1177 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1178                                    struct brw_page **pga, int opc,
1179                                    cksum_type_t cksum_type)
1180 {
1181         __u32                           cksum;
1182         int                             i = 0;
1183         struct cfs_crypto_hash_desc     *hdesc;
1184         unsigned int                    bufsize;
1185         int                             err;
1186         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1187
1188         LASSERT(pg_count > 0);
1189
1190         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1191         if (IS_ERR(hdesc)) {
1192                 CERROR("Unable to initialize checksum hash %s\n",
1193                        cfs_crypto_hash_name(cfs_alg));
1194                 return PTR_ERR(hdesc);
1195         }
1196
1197         while (nob > 0 && pg_count > 0) {
1198                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1199
1200                 /* corrupt the data before we compute the checksum, to
1201                  * simulate an OST->client data error */
1202                 if (i == 0 && opc == OST_READ &&
1203                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1204                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1205                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1206                         memcpy(ptr + off, "bad1", min(4, nob));
1207                         cfs_kunmap(pga[i]->pg);
1208                 }
1209                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1210                                   pga[i]->off & ~CFS_PAGE_MASK,
1211                                   count);
1212                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1213                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1214
1215                 nob -= pga[i]->count;
1216                 pg_count--;
1217                 i++;
1218         }
1219
1220         bufsize = 4;
1221         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1222
1223         if (err)
1224                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1225
1226         /* For sending we only compute the wrong checksum instead
1227          * of corrupting the data so it is still correct on a redo */
1228         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1229                 cksum++;
1230
1231         return cksum;
1232 }
1233
1234 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1235                                 struct lov_stripe_md *lsm, obd_count page_count,
1236                                 struct brw_page **pga,
1237                                 struct ptlrpc_request **reqp,
1238                                 struct obd_capa *ocapa, int reserve,
1239                                 int resend)
1240 {
1241         struct ptlrpc_request   *req;
1242         struct ptlrpc_bulk_desc *desc;
1243         struct ost_body         *body;
1244         struct obd_ioobj        *ioobj;
1245         struct niobuf_remote    *niobuf;
1246         int niocount, i, requested_nob, opc, rc;
1247         struct osc_brw_async_args *aa;
1248         struct req_capsule      *pill;
1249         struct brw_page *pg_prev;
1250
1251         ENTRY;
1252         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1253                 RETURN(-ENOMEM); /* Recoverable */
1254         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1255                 RETURN(-EINVAL); /* Fatal */
1256
1257         if ((cmd & OBD_BRW_WRITE) != 0) {
1258                 opc = OST_WRITE;
1259                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1260                                                 cli->cl_import->imp_rq_pool,
1261                                                 &RQF_OST_BRW_WRITE);
1262         } else {
1263                 opc = OST_READ;
1264                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1265         }
1266         if (req == NULL)
1267                 RETURN(-ENOMEM);
1268
1269         for (niocount = i = 1; i < page_count; i++) {
1270                 if (!can_merge_pages(pga[i - 1], pga[i]))
1271                         niocount++;
1272         }
1273
1274         pill = &req->rq_pill;
1275         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1276                              sizeof(*ioobj));
1277         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1278                              niocount * sizeof(*niobuf));
1279         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1280
1281         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1282         if (rc) {
1283                 ptlrpc_request_free(req);
1284                 RETURN(rc);
1285         }
1286         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1287         ptlrpc_at_set_req_timeout(req);
1288         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1289          * retry logic */
1290         req->rq_no_retry_einprogress = 1;
1291
1292         if (opc == OST_WRITE)
1293                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1294                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1295         else
1296                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1297                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1298
1299         if (desc == NULL)
1300                 GOTO(out, rc = -ENOMEM);
1301         /* NB request now owns desc and will free it when it gets freed */
1302
1303         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1304         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1305         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1306         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1307
1308         lustre_set_wire_obdo(&body->oa, oa);
1309
1310         obdo_to_ioobj(oa, ioobj);
1311         ioobj->ioo_bufcnt = niocount;
1312         osc_pack_capa(req, body, ocapa);
1313         LASSERT (page_count > 0);
1314         pg_prev = pga[0];
1315         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1316                 struct brw_page *pg = pga[i];
1317                 int poff = pg->off & ~CFS_PAGE_MASK;
1318
1319                 LASSERT(pg->count > 0);
1320                 /* make sure there is no gap in the middle of page array */
1321                 LASSERTF(page_count == 1 ||
1322                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1323                           ergo(i > 0 && i < page_count - 1,
1324                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1325                           ergo(i == page_count - 1, poff == 0)),
1326                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1327                          i, page_count, pg, pg->off, pg->count);
1328 #ifdef __linux__
1329                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1330                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1331                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1332                          i, page_count,
1333                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1334                          pg_prev->pg, page_private(pg_prev->pg),
1335                          pg_prev->pg->index, pg_prev->off);
1336 #else
1337                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1338                          "i %d p_c %u\n", i, page_count);
1339 #endif
1340                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1341                         (pg->flag & OBD_BRW_SRVLOCK));
1342
1343                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1344                 requested_nob += pg->count;
1345
1346                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1347                         niobuf--;
1348                         niobuf->len += pg->count;
1349                 } else {
1350                         niobuf->offset = pg->off;
1351                         niobuf->len    = pg->count;
1352                         niobuf->flags  = pg->flag;
1353                 }
1354                 pg_prev = pg;
1355         }
1356
1357         LASSERTF((void *)(niobuf - niocount) ==
1358                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1359                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1360                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1361
1362         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1363         if (resend) {
1364                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1365                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1366                         body->oa.o_flags = 0;
1367                 }
1368                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1369         }
1370
1371         if (osc_should_shrink_grant(cli))
1372                 osc_shrink_grant_local(cli, &body->oa);
1373
1374         /* size[REQ_REC_OFF] still sizeof (*body) */
1375         if (opc == OST_WRITE) {
1376                 if (cli->cl_checksum &&
1377                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1378                         /* store cl_cksum_type in a local variable since
1379                          * it can be changed via lprocfs */
1380                         cksum_type_t cksum_type = cli->cl_cksum_type;
1381
1382                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1383                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1384                                 body->oa.o_flags = 0;
1385                         }
1386                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1387                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1388                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1389                                                              page_count, pga,
1390                                                              OST_WRITE,
1391                                                              cksum_type);
1392                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1393                                body->oa.o_cksum);
1394                         /* save this in 'oa', too, for later checking */
1395                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1396                         oa->o_flags |= cksum_type_pack(cksum_type);
1397                 } else {
1398                         /* clear out the checksum flag, in case this is a
1399                          * resend but cl_checksum is no longer set. b=11238 */
1400                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1401                 }
1402                 oa->o_cksum = body->oa.o_cksum;
1403                 /* 1 RC per niobuf */
1404                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1405                                      sizeof(__u32) * niocount);
1406         } else {
1407                 if (cli->cl_checksum &&
1408                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1409                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1410                                 body->oa.o_flags = 0;
1411                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1412                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1413                 }
1414         }
1415         ptlrpc_request_set_replen(req);
1416
1417         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1418         aa = ptlrpc_req_async_args(req);
1419         aa->aa_oa = oa;
1420         aa->aa_requested_nob = requested_nob;
1421         aa->aa_nio_count = niocount;
1422         aa->aa_page_count = page_count;
1423         aa->aa_resends = 0;
1424         aa->aa_ppga = pga;
1425         aa->aa_cli = cli;
1426         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1427         if (ocapa && reserve)
1428                 aa->aa_ocapa = capa_get(ocapa);
1429
1430         *reqp = req;
1431         RETURN(0);
1432
1433  out:
1434         ptlrpc_req_finished(req);
1435         RETURN(rc);
1436 }
1437
1438 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1439                                 __u32 client_cksum, __u32 server_cksum, int nob,
1440                                 obd_count page_count, struct brw_page **pga,
1441                                 cksum_type_t client_cksum_type)
1442 {
1443         __u32 new_cksum;
1444         char *msg;
1445         cksum_type_t cksum_type;
1446
1447         if (server_cksum == client_cksum) {
1448                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1449                 return 0;
1450         }
1451
1452         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1453                                        oa->o_flags : 0);
1454         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1455                                       cksum_type);
1456
1457         if (cksum_type != client_cksum_type)
1458                 msg = "the server did not use the checksum type specified in "
1459                       "the original request - likely a protocol problem";
1460         else if (new_cksum == server_cksum)
1461                 msg = "changed on the client after we checksummed it - "
1462                       "likely false positive due to mmap IO (bug 11742)";
1463         else if (new_cksum == client_cksum)
1464                 msg = "changed in transit before arrival at OST";
1465         else
1466                 msg = "changed in transit AND doesn't match the original - "
1467                       "likely false positive due to mmap IO (bug 11742)";
1468
1469         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1470                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1471                            msg, libcfs_nid2str(peer->nid),
1472                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1473                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1474                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1475                            oa->o_id,
1476                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1477                            pga[0]->off,
1478                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1479         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1480                "client csum now %x\n", client_cksum, client_cksum_type,
1481                server_cksum, cksum_type, new_cksum);
1482         return 1;
1483 }
1484
1485 /* Note rc enters this function as number of bytes transferred */
1486 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1487 {
1488         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1489         const lnet_process_id_t *peer =
1490                         &req->rq_import->imp_connection->c_peer;
1491         struct client_obd *cli = aa->aa_cli;
1492         struct ost_body *body;
1493         __u32 client_cksum = 0;
1494         ENTRY;
1495
1496         if (rc < 0 && rc != -EDQUOT) {
1497                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1498                 RETURN(rc);
1499         }
1500
1501         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1502         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1503         if (body == NULL) {
1504                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1505                 RETURN(-EPROTO);
1506         }
1507
1508         /* set/clear over quota flag for a uid/gid */
1509         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1510             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1511                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1512
1513                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1514                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1515                        body->oa.o_flags);
1516                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1517         }
1518
1519         osc_update_grant(cli, body);
1520
1521         if (rc < 0)
1522                 RETURN(rc);
1523
1524         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1525                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1526
1527         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1528                 if (rc > 0) {
1529                         CERROR("Unexpected +ve rc %d\n", rc);
1530                         RETURN(-EPROTO);
1531                 }
1532                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1533
1534                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1535                         RETURN(-EAGAIN);
1536
1537                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1538                     check_write_checksum(&body->oa, peer, client_cksum,
1539                                          body->oa.o_cksum, aa->aa_requested_nob,
1540                                          aa->aa_page_count, aa->aa_ppga,
1541                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1542                         RETURN(-EAGAIN);
1543
1544                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1545                                      aa->aa_page_count, aa->aa_ppga);
1546                 GOTO(out, rc);
1547         }
1548
1549         /* The rest of this function executes only for OST_READs */
1550
1551         /* if unwrap_bulk failed, return -EAGAIN to retry */
1552         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1553         if (rc < 0)
1554                 GOTO(out, rc = -EAGAIN);
1555
1556         if (rc > aa->aa_requested_nob) {
1557                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1558                        aa->aa_requested_nob);
1559                 RETURN(-EPROTO);
1560         }
1561
1562         if (rc != req->rq_bulk->bd_nob_transferred) {
1563                 CERROR ("Unexpected rc %d (%d transferred)\n",
1564                         rc, req->rq_bulk->bd_nob_transferred);
1565                 return (-EPROTO);
1566         }
1567
1568         if (rc < aa->aa_requested_nob)
1569                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1570
1571         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1572                 static int cksum_counter;
1573                 __u32      server_cksum = body->oa.o_cksum;
1574                 char      *via;
1575                 char      *router;
1576                 cksum_type_t cksum_type;
1577
1578                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1579                                                body->oa.o_flags : 0);
1580                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1581                                                  aa->aa_ppga, OST_READ,
1582                                                  cksum_type);
1583
1584                 if (peer->nid == req->rq_bulk->bd_sender) {
1585                         via = router = "";
1586                 } else {
1587                         via = " via ";
1588                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1589                 }
1590
1591                 if (server_cksum == ~0 && rc > 0) {
1592                         CERROR("Protocol error: server %s set the 'checksum' "
1593                                "bit, but didn't send a checksum.  Not fatal, "
1594                                "but please notify on http://bugs.whamcloud.com/\n",
1595                                libcfs_nid2str(peer->nid));
1596                 } else if (server_cksum != client_cksum) {
1597                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1598                                            "%s%s%s inode "DFID" object "
1599                                            LPU64"/"LPU64" extent "
1600                                            "["LPU64"-"LPU64"]\n",
1601                                            req->rq_import->imp_obd->obd_name,
1602                                            libcfs_nid2str(peer->nid),
1603                                            via, router,
1604                                            body->oa.o_valid & OBD_MD_FLFID ?
1605                                                 body->oa.o_parent_seq : (__u64)0,
1606                                            body->oa.o_valid & OBD_MD_FLFID ?
1607                                                 body->oa.o_parent_oid : 0,
1608                                            body->oa.o_valid & OBD_MD_FLFID ?
1609                                                 body->oa.o_parent_ver : 0,
1610                                            body->oa.o_id,
1611                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1612                                                 body->oa.o_seq : (__u64)0,
1613                                            aa->aa_ppga[0]->off,
1614                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1615                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1616                                                                         1);
1617                         CERROR("client %x, server %x, cksum_type %x\n",
1618                                client_cksum, server_cksum, cksum_type);
1619                         cksum_counter = 0;
1620                         aa->aa_oa->o_cksum = client_cksum;
1621                         rc = -EAGAIN;
1622                 } else {
1623                         cksum_counter++;
1624                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1625                         rc = 0;
1626                 }
1627         } else if (unlikely(client_cksum)) {
1628                 static int cksum_missed;
1629
1630                 cksum_missed++;
1631                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1632                         CERROR("Checksum %u requested from %s but not sent\n",
1633                                cksum_missed, libcfs_nid2str(peer->nid));
1634         } else {
1635                 rc = 0;
1636         }
1637 out:
1638         if (rc >= 0)
1639                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1640
1641         RETURN(rc);
1642 }
1643
1644 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1645                             struct lov_stripe_md *lsm,
1646                             obd_count page_count, struct brw_page **pga,
1647                             struct obd_capa *ocapa)
1648 {
1649         struct ptlrpc_request *req;
1650         int                    rc;
1651         cfs_waitq_t            waitq;
1652         int                    generation, resends = 0;
1653         struct l_wait_info     lwi;
1654
1655         ENTRY;
1656
1657         cfs_waitq_init(&waitq);
1658         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1659
1660 restart_bulk:
1661         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1662                                   page_count, pga, &req, ocapa, 0, resends);
1663         if (rc != 0)
1664                 return (rc);
1665
1666         if (resends) {
1667                 req->rq_generation_set = 1;
1668                 req->rq_import_generation = generation;
1669                 req->rq_sent = cfs_time_current_sec() + resends;
1670         }
1671
1672         rc = ptlrpc_queue_wait(req);
1673
1674         if (rc == -ETIMEDOUT && req->rq_resend) {
1675                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1676                 ptlrpc_req_finished(req);
1677                 goto restart_bulk;
1678         }
1679
1680         rc = osc_brw_fini_request(req, rc);
1681
1682         ptlrpc_req_finished(req);
1683         /* When server return -EINPROGRESS, client should always retry
1684          * regardless of the number of times the bulk was resent already.*/
1685         if (osc_recoverable_error(rc)) {
1686                 resends++;
1687                 if (rc != -EINPROGRESS &&
1688                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1689                         CERROR("%s: too many resend retries for object: "
1690                                ""LPU64":"LPU64", rc = %d.\n",
1691                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1692                         goto out;
1693                 }
1694                 if (generation !=
1695                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1696                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1697                                ""LPU64":"LPU64", rc = %d.\n",
1698                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1699                         goto out;
1700                 }
1701
1702                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1703                                        NULL);
1704                 l_wait_event(waitq, 0, &lwi);
1705
1706                 goto restart_bulk;
1707         }
1708 out:
1709         if (rc == -EAGAIN || rc == -EINPROGRESS)
1710                 rc = -EIO;
1711         RETURN (rc);
1712 }
1713
1714 static int osc_brw_redo_request(struct ptlrpc_request *request,
1715                                 struct osc_brw_async_args *aa, int rc)
1716 {
1717         struct ptlrpc_request *new_req;
1718         struct osc_brw_async_args *new_aa;
1719         struct osc_async_page *oap;
1720         ENTRY;
1721
1722         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1723                   "redo for recoverable error %d", rc);
1724
1725         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1726                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1727                                   aa->aa_cli, aa->aa_oa,
1728                                   NULL /* lsm unused by osc currently */,
1729                                   aa->aa_page_count, aa->aa_ppga,
1730                                   &new_req, aa->aa_ocapa, 0, 1);
1731         if (rc)
1732                 RETURN(rc);
1733
1734         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1735                 if (oap->oap_request != NULL) {
1736                         LASSERTF(request == oap->oap_request,
1737                                  "request %p != oap_request %p\n",
1738                                  request, oap->oap_request);
1739                         if (oap->oap_interrupted) {
1740                                 ptlrpc_req_finished(new_req);
1741                                 RETURN(-EINTR);
1742                         }
1743                 }
1744         }
1745         /* New request takes over pga and oaps from old request.
1746          * Note that copying a list_head doesn't work, need to move it... */
1747         aa->aa_resends++;
1748         new_req->rq_interpret_reply = request->rq_interpret_reply;
1749         new_req->rq_async_args = request->rq_async_args;
1750         /* cap resend delay to the current request timeout, this is similar to
1751          * what ptlrpc does (see after_reply()) */
1752         if (aa->aa_resends > new_req->rq_timeout)
1753                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1754         else
1755                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1756         new_req->rq_generation_set = 1;
1757         new_req->rq_import_generation = request->rq_import_generation;
1758
1759         new_aa = ptlrpc_req_async_args(new_req);
1760
1761         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1762         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1763         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1764         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1765
1766         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1767                 if (oap->oap_request) {
1768                         ptlrpc_req_finished(oap->oap_request);
1769                         oap->oap_request = ptlrpc_request_addref(new_req);
1770                 }
1771         }
1772
1773         new_aa->aa_ocapa = aa->aa_ocapa;
1774         aa->aa_ocapa = NULL;
1775
1776         /* XXX: This code will run into problem if we're going to support
1777          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1778          * and wait for all of them to be finished. We should inherit request
1779          * set from old request. */
1780         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1781
1782         DEBUG_REQ(D_INFO, new_req, "new request");
1783         RETURN(0);
1784 }
1785
1786 /*
1787  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1788  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1789  * fine for our small page arrays and doesn't require allocation.  its an
1790  * insertion sort that swaps elements that are strides apart, shrinking the
1791  * stride down until its '1' and the array is sorted.
1792  */
1793 static void sort_brw_pages(struct brw_page **array, int num)
1794 {
1795         int stride, i, j;
1796         struct brw_page *tmp;
1797
1798         if (num == 1)
1799                 return;
1800         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1801                 ;
1802
1803         do {
1804                 stride /= 3;
1805                 for (i = stride ; i < num ; i++) {
1806                         tmp = array[i];
1807                         j = i;
1808                         while (j >= stride && array[j - stride]->off > tmp->off) {
1809                                 array[j] = array[j - stride];
1810                                 j -= stride;
1811                         }
1812                         array[j] = tmp;
1813                 }
1814         } while (stride > 1);
1815 }
1816
1817 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1818 {
1819         int count = 1;
1820         int offset;
1821         int i = 0;
1822
1823         LASSERT (pages > 0);
1824         offset = pg[i]->off & ~CFS_PAGE_MASK;
1825
1826         for (;;) {
1827                 pages--;
1828                 if (pages == 0)         /* that's all */
1829                         return count;
1830
1831                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1832                         return count;   /* doesn't end on page boundary */
1833
1834                 i++;
1835                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1836                 if (offset != 0)        /* doesn't start on page boundary */
1837                         return count;
1838
1839                 count++;
1840         }
1841 }
1842
1843 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1844 {
1845         struct brw_page **ppga;
1846         int i;
1847
1848         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1849         if (ppga == NULL)
1850                 return NULL;
1851
1852         for (i = 0; i < count; i++)
1853                 ppga[i] = pga + i;
1854         return ppga;
1855 }
1856
1857 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1858 {
1859         LASSERT(ppga != NULL);
1860         OBD_FREE(ppga, sizeof(*ppga) * count);
1861 }
1862
1863 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1864                    obd_count page_count, struct brw_page *pga,
1865                    struct obd_trans_info *oti)
1866 {
1867         struct obdo *saved_oa = NULL;
1868         struct brw_page **ppga, **orig;
1869         struct obd_import *imp = class_exp2cliimp(exp);
1870         struct client_obd *cli;
1871         int rc, page_count_orig;
1872         ENTRY;
1873
1874         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1875         cli = &imp->imp_obd->u.cli;
1876
1877         if (cmd & OBD_BRW_CHECK) {
1878                 /* The caller just wants to know if there's a chance that this
1879                  * I/O can succeed */
1880
1881                 if (imp->imp_invalid)
1882                         RETURN(-EIO);
1883                 RETURN(0);
1884         }
1885
1886         /* test_brw with a failed create can trip this, maybe others. */
1887         LASSERT(cli->cl_max_pages_per_rpc);
1888
1889         rc = 0;
1890
1891         orig = ppga = osc_build_ppga(pga, page_count);
1892         if (ppga == NULL)
1893                 RETURN(-ENOMEM);
1894         page_count_orig = page_count;
1895
1896         sort_brw_pages(ppga, page_count);
1897         while (page_count) {
1898                 obd_count pages_per_brw;
1899
1900                 if (page_count > cli->cl_max_pages_per_rpc)
1901                         pages_per_brw = cli->cl_max_pages_per_rpc;
1902                 else
1903                         pages_per_brw = page_count;
1904
1905                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1906
1907                 if (saved_oa != NULL) {
1908                         /* restore previously saved oa */
1909                         *oinfo->oi_oa = *saved_oa;
1910                 } else if (page_count > pages_per_brw) {
1911                         /* save a copy of oa (brw will clobber it) */
1912                         OBDO_ALLOC(saved_oa);
1913                         if (saved_oa == NULL)
1914                                 GOTO(out, rc = -ENOMEM);
1915                         *saved_oa = *oinfo->oi_oa;
1916                 }
1917
1918                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1919                                       pages_per_brw, ppga, oinfo->oi_capa);
1920
1921                 if (rc != 0)
1922                         break;
1923
1924                 page_count -= pages_per_brw;
1925                 ppga += pages_per_brw;
1926         }
1927
1928 out:
1929         osc_release_ppga(orig, page_count_orig);
1930
1931         if (saved_oa != NULL)
1932                 OBDO_FREE(saved_oa);
1933
1934         RETURN(rc);
1935 }
1936
1937 static int brw_interpret(const struct lu_env *env,
1938                          struct ptlrpc_request *req, void *data, int rc)
1939 {
1940         struct osc_brw_async_args *aa = data;
1941         struct osc_extent *ext;
1942         struct osc_extent *tmp;
1943         struct cl_object  *obj = NULL;
1944         struct client_obd *cli = aa->aa_cli;
1945         ENTRY;
1946
1947         rc = osc_brw_fini_request(req, rc);
1948         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1949         /* When server return -EINPROGRESS, client should always retry
1950          * regardless of the number of times the bulk was resent already. */
1951         if (osc_recoverable_error(rc)) {
1952                 if (req->rq_import_generation !=
1953                     req->rq_import->imp_generation) {
1954                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1955                                ""LPU64":"LPU64", rc = %d.\n",
1956                                req->rq_import->imp_obd->obd_name,
1957                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1958                 } else if (rc == -EINPROGRESS ||
1959                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1960                         rc = osc_brw_redo_request(req, aa, rc);
1961                 } else {
1962                         CERROR("%s: too many resent retries for object: "
1963                                ""LPU64":"LPU64", rc = %d.\n",
1964                                req->rq_import->imp_obd->obd_name,
1965                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1966                 }
1967
1968                 if (rc == 0)
1969                         RETURN(0);
1970                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1971                         rc = -EIO;
1972         }
1973
1974         if (aa->aa_ocapa) {
1975                 capa_put(aa->aa_ocapa);
1976                 aa->aa_ocapa = NULL;
1977         }
1978
1979         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1980                 if (obj == NULL && rc == 0) {
1981                         obj = osc2cl(ext->oe_obj);
1982                         cl_object_get(obj);
1983                 }
1984
1985                 cfs_list_del_init(&ext->oe_link);
1986                 osc_extent_finish(env, ext, 1, rc);
1987         }
1988         LASSERT(cfs_list_empty(&aa->aa_exts));
1989         LASSERT(cfs_list_empty(&aa->aa_oaps));
1990
1991         if (obj != NULL) {
1992                 struct obdo *oa = aa->aa_oa;
1993                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1994                 unsigned long valid = 0;
1995
1996                 LASSERT(rc == 0);
1997                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1998                         attr->cat_blocks = oa->o_blocks;
1999                         valid |= CAT_BLOCKS;
2000                 }
2001                 if (oa->o_valid & OBD_MD_FLMTIME) {
2002                         attr->cat_mtime = oa->o_mtime;
2003                         valid |= CAT_MTIME;
2004                 }
2005                 if (oa->o_valid & OBD_MD_FLATIME) {
2006                         attr->cat_atime = oa->o_atime;
2007                         valid |= CAT_ATIME;
2008                 }
2009                 if (oa->o_valid & OBD_MD_FLCTIME) {
2010                         attr->cat_ctime = oa->o_ctime;
2011                         valid |= CAT_CTIME;
2012                 }
2013                 if (valid != 0) {
2014                         cl_object_attr_lock(obj);
2015                         cl_object_attr_set(env, obj, attr, valid);
2016                         cl_object_attr_unlock(obj);
2017                 }
2018                 cl_object_put(env, obj);
2019         }
2020         OBDO_FREE(aa->aa_oa);
2021
2022         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2023                           req->rq_bulk->bd_nob_transferred);
2024         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2025         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2026
2027         client_obd_list_lock(&cli->cl_loi_list_lock);
2028         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2029          * is called so we know whether to go to sync BRWs or wait for more
2030          * RPCs to complete */
2031         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2032                 cli->cl_w_in_flight--;
2033         else
2034                 cli->cl_r_in_flight--;
2035         osc_wake_cache_waiters(cli);
2036         client_obd_list_unlock(&cli->cl_loi_list_lock);
2037
2038         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2039         RETURN(rc);
2040 }
2041
2042 /**
2043  * Build an RPC by the list of extent @ext_list. The caller must ensure
2044  * that the total pages in this list are NOT over max pages per RPC.
2045  * Extents in the list must be in OES_RPC state.
2046  */
2047 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2048                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2049 {
2050         struct ptlrpc_request *req = NULL;
2051         struct osc_extent *ext;
2052         CFS_LIST_HEAD(rpc_list);
2053         struct brw_page **pga = NULL;
2054         struct osc_brw_async_args *aa = NULL;
2055         struct obdo *oa = NULL;
2056         struct osc_async_page *oap;
2057         struct osc_async_page *tmp;
2058         struct cl_req *clerq = NULL;
2059         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2060         struct ldlm_lock *lock = NULL;
2061         struct cl_req_attr crattr;
2062         obd_off starting_offset = OBD_OBJECT_EOF;
2063         obd_off ending_offset = 0;
2064         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2065
2066         ENTRY;
2067         LASSERT(!cfs_list_empty(ext_list));
2068
2069         /* add pages into rpc_list to build BRW rpc */
2070         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2071                 LASSERT(ext->oe_state == OES_RPC);
2072                 mem_tight |= ext->oe_memalloc;
2073                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2074                         ++page_count;
2075                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2076                         if (starting_offset > oap->oap_obj_off)
2077                                 starting_offset = oap->oap_obj_off;
2078                         else
2079                                 LASSERT(oap->oap_page_off == 0);
2080                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2081                                 ending_offset = oap->oap_obj_off +
2082                                                 oap->oap_count;
2083                         else
2084                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2085                                         CFS_PAGE_SIZE);
2086                 }
2087         }
2088
2089         if (mem_tight)
2090                 mpflag = cfs_memory_pressure_get_and_set();
2091
2092         memset(&crattr, 0, sizeof crattr);
2093         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2094         if (pga == NULL)
2095                 GOTO(out, rc = -ENOMEM);
2096
2097         OBDO_ALLOC(oa);
2098         if (oa == NULL)
2099                 GOTO(out, rc = -ENOMEM);
2100
2101         i = 0;
2102         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2103                 struct cl_page *page = oap2cl_page(oap);
2104                 if (clerq == NULL) {
2105                         clerq = cl_req_alloc(env, page, crt,
2106                                              1 /* only 1-object rpcs for
2107                                                 * now */);
2108                         if (IS_ERR(clerq))
2109                                 GOTO(out, rc = PTR_ERR(clerq));
2110                         lock = oap->oap_ldlm_lock;
2111                 }
2112                 if (mem_tight)
2113                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2114                 pga[i] = &oap->oap_brw_page;
2115                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2116                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2117                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2118                 i++;
2119                 cl_req_page_add(env, clerq, page);
2120         }
2121
2122         /* always get the data for the obdo for the rpc */
2123         LASSERT(clerq != NULL);
2124         crattr.cra_oa = oa;
2125         crattr.cra_capa = NULL;
2126         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2127         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2128         if (lock) {
2129                 oa->o_handle = lock->l_remote_handle;
2130                 oa->o_valid |= OBD_MD_FLHANDLE;
2131         }
2132
2133         rc = cl_req_prep(env, clerq);
2134         if (rc != 0) {
2135                 CERROR("cl_req_prep failed: %d\n", rc);
2136                 GOTO(out, rc);
2137         }
2138
2139         sort_brw_pages(pga, page_count);
2140         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2141                         pga, &req, crattr.cra_capa, 1, 0);
2142         if (rc != 0) {
2143                 CERROR("prep_req failed: %d\n", rc);
2144                 GOTO(out, rc);
2145         }
2146
2147         req->rq_interpret_reply = brw_interpret;
2148         if (mem_tight != 0)
2149                 req->rq_memalloc = 1;
2150
2151         /* Need to update the timestamps after the request is built in case
2152          * we race with setattr (locally or in queue at OST).  If OST gets
2153          * later setattr before earlier BRW (as determined by the request xid),
2154          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2155          * way to do this in a single call.  bug 10150 */
2156         cl_req_attr_set(env, clerq, &crattr,
2157                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2158
2159         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2160
2161         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2162         aa = ptlrpc_req_async_args(req);
2163         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2164         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2165         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2166         cfs_list_splice_init(ext_list, &aa->aa_exts);
2167         aa->aa_clerq = clerq;
2168
2169         /* queued sync pages can be torn down while the pages
2170          * were between the pending list and the rpc */
2171         tmp = NULL;
2172         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2173                 /* only one oap gets a request reference */
2174                 if (tmp == NULL)
2175                         tmp = oap;
2176                 if (oap->oap_interrupted && !req->rq_intr) {
2177                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2178                                         oap, req);
2179                         ptlrpc_mark_interrupted(req);
2180                 }
2181         }
2182         if (tmp != NULL)
2183                 tmp->oap_request = ptlrpc_request_addref(req);
2184
2185         client_obd_list_lock(&cli->cl_loi_list_lock);
2186         starting_offset >>= CFS_PAGE_SHIFT;
2187         if (cmd == OBD_BRW_READ) {
2188                 cli->cl_r_in_flight++;
2189                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2190                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2191                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2192                                       starting_offset + 1);
2193         } else {
2194                 cli->cl_w_in_flight++;
2195                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2196                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2197                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2198                                       starting_offset + 1);
2199         }
2200         client_obd_list_unlock(&cli->cl_loi_list_lock);
2201
2202         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2203                   page_count, aa, cli->cl_r_in_flight,
2204                   cli->cl_w_in_flight);
2205
2206         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2207          * see which CPU/NUMA node the majority of pages were allocated
2208          * on, and try to assign the async RPC to the CPU core
2209          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2210          *
2211          * But on the other hand, we expect that multiple ptlrpcd
2212          * threads and the initial write sponsor can run in parallel,
2213          * especially when data checksum is enabled, which is CPU-bound
2214          * operation and single ptlrpcd thread cannot process in time.
2215          * So more ptlrpcd threads sharing BRW load
2216          * (with PDL_POLICY_ROUND) seems better.
2217          */
2218         ptlrpcd_add_req(req, pol, -1);
2219         rc = 0;
2220         EXIT;
2221
2222 out:
2223         if (mem_tight != 0)
2224                 cfs_memory_pressure_restore(mpflag);
2225
2226         capa_put(crattr.cra_capa);
2227         if (rc != 0) {
2228                 LASSERT(req == NULL);
2229
2230                 if (oa)
2231                         OBDO_FREE(oa);
2232                 if (pga)
2233                         OBD_FREE(pga, sizeof(*pga) * page_count);
2234                 /* this should happen rarely and is pretty bad, it makes the
2235                  * pending list not follow the dirty order */
2236                 while (!cfs_list_empty(ext_list)) {
2237                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2238                                              oe_link);
2239                         cfs_list_del_init(&ext->oe_link);
2240                         osc_extent_finish(env, ext, 0, rc);
2241                 }
2242                 if (clerq && !IS_ERR(clerq))
2243                         cl_req_completion(env, clerq, rc);
2244         }
2245         RETURN(rc);
2246 }
2247
2248 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2249                                         struct ldlm_enqueue_info *einfo)
2250 {
2251         void *data = einfo->ei_cbdata;
2252         int set = 0;
2253
2254         LASSERT(lock != NULL);
2255         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2256         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2257         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2258         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2259
2260         lock_res_and_lock(lock);
2261         spin_lock(&osc_ast_guard);
2262
2263         if (lock->l_ast_data == NULL)
2264                 lock->l_ast_data = data;
2265         if (lock->l_ast_data == data)
2266                 set = 1;
2267
2268         spin_unlock(&osc_ast_guard);
2269         unlock_res_and_lock(lock);
2270
2271         return set;
2272 }
2273
2274 static int osc_set_data_with_check(struct lustre_handle *lockh,
2275                                    struct ldlm_enqueue_info *einfo)
2276 {
2277         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2278         int set = 0;
2279
2280         if (lock != NULL) {
2281                 set = osc_set_lock_data_with_check(lock, einfo);
2282                 LDLM_LOCK_PUT(lock);
2283         } else
2284                 CERROR("lockh %p, data %p - client evicted?\n",
2285                        lockh, einfo->ei_cbdata);
2286         return set;
2287 }
2288
2289 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2290                              ldlm_iterator_t replace, void *data)
2291 {
2292         struct ldlm_res_id res_id;
2293         struct obd_device *obd = class_exp2obd(exp);
2294
2295         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2296         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2297         return 0;
2298 }
2299
2300 /* find any ldlm lock of the inode in osc
2301  * return 0    not find
2302  *        1    find one
2303  *      < 0    error */
2304 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2305                            ldlm_iterator_t replace, void *data)
2306 {
2307         struct ldlm_res_id res_id;
2308         struct obd_device *obd = class_exp2obd(exp);
2309         int rc = 0;
2310
2311         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2312         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2313         if (rc == LDLM_ITER_STOP)
2314                 return(1);
2315         if (rc == LDLM_ITER_CONTINUE)
2316                 return(0);
2317         return(rc);
2318 }
2319
2320 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2321                             obd_enqueue_update_f upcall, void *cookie,
2322                             __u64 *flags, int agl, int rc)
2323 {
2324         int intent = *flags & LDLM_FL_HAS_INTENT;
2325         ENTRY;
2326
2327         if (intent) {
2328                 /* The request was created before ldlm_cli_enqueue call. */
2329                 if (rc == ELDLM_LOCK_ABORTED) {
2330                         struct ldlm_reply *rep;
2331                         rep = req_capsule_server_get(&req->rq_pill,
2332                                                      &RMF_DLM_REP);
2333
2334                         LASSERT(rep != NULL);
2335                         if (rep->lock_policy_res1)
2336                                 rc = rep->lock_policy_res1;
2337                 }
2338         }
2339
2340         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2341             (rc == 0)) {
2342                 *flags |= LDLM_FL_LVB_READY;
2343                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2344                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2345         }
2346
2347         /* Call the update callback. */
2348         rc = (*upcall)(cookie, rc);
2349         RETURN(rc);
2350 }
2351
2352 static int osc_enqueue_interpret(const struct lu_env *env,
2353                                  struct ptlrpc_request *req,
2354                                  struct osc_enqueue_args *aa, int rc)
2355 {
2356         struct ldlm_lock *lock;
2357         struct lustre_handle handle;
2358         __u32 mode;
2359         struct ost_lvb *lvb;
2360         __u32 lvb_len;
2361         __u64 *flags = aa->oa_flags;
2362
2363         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2364          * might be freed anytime after lock upcall has been called. */
2365         lustre_handle_copy(&handle, aa->oa_lockh);
2366         mode = aa->oa_ei->ei_mode;
2367
2368         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2369          * be valid. */
2370         lock = ldlm_handle2lock(&handle);
2371
2372         /* Take an additional reference so that a blocking AST that
2373          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2374          * to arrive after an upcall has been executed by
2375          * osc_enqueue_fini(). */
2376         ldlm_lock_addref(&handle, mode);
2377
2378         /* Let CP AST to grant the lock first. */
2379         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2380
2381         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2382                 lvb = NULL;
2383                 lvb_len = 0;
2384         } else {
2385                 lvb = aa->oa_lvb;
2386                 lvb_len = sizeof(*aa->oa_lvb);
2387         }
2388
2389         /* Complete obtaining the lock procedure. */
2390         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2391                                    mode, flags, lvb, lvb_len, &handle, rc);
2392         /* Complete osc stuff. */
2393         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2394                               flags, aa->oa_agl, rc);
2395
2396         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2397
2398         /* Release the lock for async request. */
2399         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2400                 /*
2401                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2402                  * not already released by
2403                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2404                  */
2405                 ldlm_lock_decref(&handle, mode);
2406
2407         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2408                  aa->oa_lockh, req, aa);
2409         ldlm_lock_decref(&handle, mode);
2410         LDLM_LOCK_PUT(lock);
2411         return rc;
2412 }
2413
2414 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2415                         struct lov_oinfo *loi, int flags,
2416                         struct ost_lvb *lvb, __u32 mode, int rc)
2417 {
2418         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2419
2420         if (rc == ELDLM_OK) {
2421                 __u64 tmp;
2422
2423                 LASSERT(lock != NULL);
2424                 loi->loi_lvb = *lvb;
2425                 tmp = loi->loi_lvb.lvb_size;
2426                 /* Extend KMS up to the end of this lock and no further
2427                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2428                 if (tmp > lock->l_policy_data.l_extent.end)
2429                         tmp = lock->l_policy_data.l_extent.end + 1;
2430                 if (tmp >= loi->loi_kms) {
2431                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2432                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2433                         loi_kms_set(loi, tmp);
2434                 } else {
2435                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2436                                    LPU64"; leaving kms="LPU64", end="LPU64,
2437                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2438                                    lock->l_policy_data.l_extent.end);
2439                 }
2440                 ldlm_lock_allow_match(lock);
2441         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2442                 LASSERT(lock != NULL);
2443                 loi->loi_lvb = *lvb;
2444                 ldlm_lock_allow_match(lock);
2445                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2446                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2447                 rc = ELDLM_OK;
2448         }
2449
2450         if (lock != NULL) {
2451                 if (rc != ELDLM_OK)
2452                         ldlm_lock_fail_match(lock);
2453
2454                 LDLM_LOCK_PUT(lock);
2455         }
2456 }
2457 EXPORT_SYMBOL(osc_update_enqueue);
2458
2459 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2460
2461 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2462  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2463  * other synchronous requests, however keeping some locks and trying to obtain
2464  * others may take a considerable amount of time in a case of ost failure; and
2465  * when other sync requests do not get released lock from a client, the client
2466  * is excluded from the cluster -- such scenarious make the life difficult, so
2467  * release locks just after they are obtained. */
2468 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2469                      __u64 *flags, ldlm_policy_data_t *policy,
2470                      struct ost_lvb *lvb, int kms_valid,
2471                      obd_enqueue_update_f upcall, void *cookie,
2472                      struct ldlm_enqueue_info *einfo,
2473                      struct lustre_handle *lockh,
2474                      struct ptlrpc_request_set *rqset, int async, int agl)
2475 {
2476         struct obd_device *obd = exp->exp_obd;
2477         struct ptlrpc_request *req = NULL;
2478         int intent = *flags & LDLM_FL_HAS_INTENT;
2479         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2480         ldlm_mode_t mode;
2481         int rc;
2482         ENTRY;
2483
2484         /* Filesystem lock extents are extended to page boundaries so that
2485          * dealing with the page cache is a little smoother.  */
2486         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2487         policy->l_extent.end |= ~CFS_PAGE_MASK;
2488
2489         /*
2490          * kms is not valid when either object is completely fresh (so that no
2491          * locks are cached), or object was evicted. In the latter case cached
2492          * lock cannot be used, because it would prime inode state with
2493          * potentially stale LVB.
2494          */
2495         if (!kms_valid)
2496                 goto no_match;
2497
2498         /* Next, search for already existing extent locks that will cover us */
2499         /* If we're trying to read, we also search for an existing PW lock.  The
2500          * VFS and page cache already protect us locally, so lots of readers/
2501          * writers can share a single PW lock.
2502          *
2503          * There are problems with conversion deadlocks, so instead of
2504          * converting a read lock to a write lock, we'll just enqueue a new
2505          * one.
2506          *
2507          * At some point we should cancel the read lock instead of making them
2508          * send us a blocking callback, but there are problems with canceling
2509          * locks out from other users right now, too. */
2510         mode = einfo->ei_mode;
2511         if (einfo->ei_mode == LCK_PR)
2512                 mode |= LCK_PW;
2513         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2514                                einfo->ei_type, policy, mode, lockh, 0);
2515         if (mode) {
2516                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2517
2518                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2519                         /* For AGL, if enqueue RPC is sent but the lock is not
2520                          * granted, then skip to process this strpe.
2521                          * Return -ECANCELED to tell the caller. */
2522                         ldlm_lock_decref(lockh, mode);
2523                         LDLM_LOCK_PUT(matched);
2524                         RETURN(-ECANCELED);
2525                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2526                         *flags |= LDLM_FL_LVB_READY;
2527                         /* addref the lock only if not async requests and PW
2528                          * lock is matched whereas we asked for PR. */
2529                         if (!rqset && einfo->ei_mode != mode)
2530                                 ldlm_lock_addref(lockh, LCK_PR);
2531                         if (intent) {
2532                                 /* I would like to be able to ASSERT here that
2533                                  * rss <= kms, but I can't, for reasons which
2534                                  * are explained in lov_enqueue() */
2535                         }
2536
2537                         /* We already have a lock, and it's referenced.
2538                          *
2539                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2540                          * AGL upcall may change it to CLS_HELD directly. */
2541                         (*upcall)(cookie, ELDLM_OK);
2542
2543                         if (einfo->ei_mode != mode)
2544                                 ldlm_lock_decref(lockh, LCK_PW);
2545                         else if (rqset)
2546                                 /* For async requests, decref the lock. */
2547                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2548                         LDLM_LOCK_PUT(matched);
2549                         RETURN(ELDLM_OK);
2550                 } else {
2551                         ldlm_lock_decref(lockh, mode);
2552                         LDLM_LOCK_PUT(matched);
2553                 }
2554         }
2555
2556  no_match:
2557         if (intent) {
2558                 CFS_LIST_HEAD(cancels);
2559                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2560                                            &RQF_LDLM_ENQUEUE_LVB);
2561                 if (req == NULL)
2562                         RETURN(-ENOMEM);
2563
2564                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2565                 if (rc) {
2566                         ptlrpc_request_free(req);
2567                         RETURN(rc);
2568                 }
2569
2570                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2571                                      sizeof *lvb);
2572                 ptlrpc_request_set_replen(req);
2573         }
2574
2575         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2576         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2577
2578         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2579                               sizeof(*lvb), lockh, async);
2580         if (rqset) {
2581                 if (!rc) {
2582                         struct osc_enqueue_args *aa;
2583                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2584                         aa = ptlrpc_req_async_args(req);
2585                         aa->oa_ei = einfo;
2586                         aa->oa_exp = exp;
2587                         aa->oa_flags  = flags;
2588                         aa->oa_upcall = upcall;
2589                         aa->oa_cookie = cookie;
2590                         aa->oa_lvb    = lvb;
2591                         aa->oa_lockh  = lockh;
2592                         aa->oa_agl    = !!agl;
2593
2594                         req->rq_interpret_reply =
2595                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2596                         if (rqset == PTLRPCD_SET)
2597                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2598                         else
2599                                 ptlrpc_set_add_req(rqset, req);
2600                 } else if (intent) {
2601                         ptlrpc_req_finished(req);
2602                 }
2603                 RETURN(rc);
2604         }
2605
2606         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2607         if (intent)
2608                 ptlrpc_req_finished(req);
2609
2610         RETURN(rc);
2611 }
2612
2613 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2614                        struct ldlm_enqueue_info *einfo,
2615                        struct ptlrpc_request_set *rqset)
2616 {
2617         struct ldlm_res_id res_id;
2618         int rc;
2619         ENTRY;
2620
2621         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2622                            oinfo->oi_md->lsm_object_seq, &res_id);
2623
2624         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2625                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2626                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2627                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2628                               rqset, rqset != NULL, 0);
2629         RETURN(rc);
2630 }
2631
2632 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2633                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2634                    int *flags, void *data, struct lustre_handle *lockh,
2635                    int unref)
2636 {
2637         struct obd_device *obd = exp->exp_obd;
2638         int lflags = *flags;
2639         ldlm_mode_t rc;
2640         ENTRY;
2641
2642         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2643                 RETURN(-EIO);
2644
2645         /* Filesystem lock extents are extended to page boundaries so that
2646          * dealing with the page cache is a little smoother */
2647         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2648         policy->l_extent.end |= ~CFS_PAGE_MASK;
2649
2650         /* Next, search for already existing extent locks that will cover us */
2651         /* If we're trying to read, we also search for an existing PW lock.  The
2652          * VFS and page cache already protect us locally, so lots of readers/
2653          * writers can share a single PW lock. */
2654         rc = mode;
2655         if (mode == LCK_PR)
2656                 rc |= LCK_PW;
2657         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2658                              res_id, type, policy, rc, lockh, unref);
2659         if (rc) {
2660                 if (data != NULL) {
2661                         if (!osc_set_data_with_check(lockh, data)) {
2662                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2663                                         ldlm_lock_decref(lockh, rc);
2664                                 RETURN(0);
2665                         }
2666                 }
2667                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2668                         ldlm_lock_addref(lockh, LCK_PR);
2669                         ldlm_lock_decref(lockh, LCK_PW);
2670                 }
2671                 RETURN(rc);
2672         }
2673         RETURN(rc);
2674 }
2675
2676 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2677 {
2678         ENTRY;
2679
2680         if (unlikely(mode == LCK_GROUP))
2681                 ldlm_lock_decref_and_cancel(lockh, mode);
2682         else
2683                 ldlm_lock_decref(lockh, mode);
2684
2685         RETURN(0);
2686 }
2687
2688 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2689                       __u32 mode, struct lustre_handle *lockh)
2690 {
2691         ENTRY;
2692         RETURN(osc_cancel_base(lockh, mode));
2693 }
2694
2695 static int osc_cancel_unused(struct obd_export *exp,
2696                              struct lov_stripe_md *lsm,
2697                              ldlm_cancel_flags_t flags,
2698                              void *opaque)
2699 {
2700         struct obd_device *obd = class_exp2obd(exp);
2701         struct ldlm_res_id res_id, *resp = NULL;
2702
2703         if (lsm != NULL) {
2704                 resp = osc_build_res_name(lsm->lsm_object_id,
2705                                           lsm->lsm_object_seq, &res_id);
2706         }
2707
2708         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2709 }
2710
2711 static int osc_statfs_interpret(const struct lu_env *env,
2712                                 struct ptlrpc_request *req,
2713                                 struct osc_async_args *aa, int rc)
2714 {
2715         struct obd_statfs *msfs;
2716         ENTRY;
2717
2718         if (rc == -EBADR)
2719                 /* The request has in fact never been sent
2720                  * due to issues at a higher level (LOV).
2721                  * Exit immediately since the caller is
2722                  * aware of the problem and takes care
2723                  * of the clean up */
2724                  RETURN(rc);
2725
2726         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2727             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2728                 GOTO(out, rc = 0);
2729
2730         if (rc != 0)
2731                 GOTO(out, rc);
2732
2733         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2734         if (msfs == NULL) {
2735                 GOTO(out, rc = -EPROTO);
2736         }
2737
2738         *aa->aa_oi->oi_osfs = *msfs;
2739 out:
2740         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2741         RETURN(rc);
2742 }
2743
2744 static int osc_statfs_async(struct obd_export *exp,
2745                             struct obd_info *oinfo, __u64 max_age,
2746                             struct ptlrpc_request_set *rqset)
2747 {
2748         struct obd_device     *obd = class_exp2obd(exp);
2749         struct ptlrpc_request *req;
2750         struct osc_async_args *aa;
2751         int                    rc;
2752         ENTRY;
2753
2754         /* We could possibly pass max_age in the request (as an absolute
2755          * timestamp or a "seconds.usec ago") so the target can avoid doing
2756          * extra calls into the filesystem if that isn't necessary (e.g.
2757          * during mount that would help a bit).  Having relative timestamps
2758          * is not so great if request processing is slow, while absolute
2759          * timestamps are not ideal because they need time synchronization. */
2760         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2761         if (req == NULL)
2762                 RETURN(-ENOMEM);
2763
2764         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2765         if (rc) {
2766                 ptlrpc_request_free(req);
2767                 RETURN(rc);
2768         }
2769         ptlrpc_request_set_replen(req);
2770         req->rq_request_portal = OST_CREATE_PORTAL;
2771         ptlrpc_at_set_req_timeout(req);
2772
2773         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2774                 /* procfs requests not want stat in wait for avoid deadlock */
2775                 req->rq_no_resend = 1;
2776                 req->rq_no_delay = 1;
2777         }
2778
2779         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2780         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2781         aa = ptlrpc_req_async_args(req);
2782         aa->aa_oi = oinfo;
2783
2784         ptlrpc_set_add_req(rqset, req);
2785         RETURN(0);
2786 }
2787
2788 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2789                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2790 {
2791         struct obd_device     *obd = class_exp2obd(exp);
2792         struct obd_statfs     *msfs;
2793         struct ptlrpc_request *req;
2794         struct obd_import     *imp = NULL;
2795         int rc;
2796         ENTRY;
2797
2798         /*Since the request might also come from lprocfs, so we need
2799          *sync this with client_disconnect_export Bug15684*/
2800         down_read(&obd->u.cli.cl_sem);
2801         if (obd->u.cli.cl_import)
2802                 imp = class_import_get(obd->u.cli.cl_import);
2803         up_read(&obd->u.cli.cl_sem);
2804         if (!imp)
2805                 RETURN(-ENODEV);
2806
2807         /* We could possibly pass max_age in the request (as an absolute
2808          * timestamp or a "seconds.usec ago") so the target can avoid doing
2809          * extra calls into the filesystem if that isn't necessary (e.g.
2810          * during mount that would help a bit).  Having relative timestamps
2811          * is not so great if request processing is slow, while absolute
2812          * timestamps are not ideal because they need time synchronization. */
2813         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2814
2815         class_import_put(imp);
2816
2817         if (req == NULL)
2818                 RETURN(-ENOMEM);
2819
2820         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2821         if (rc) {
2822                 ptlrpc_request_free(req);
2823                 RETURN(rc);
2824         }
2825         ptlrpc_request_set_replen(req);
2826         req->rq_request_portal = OST_CREATE_PORTAL;
2827         ptlrpc_at_set_req_timeout(req);
2828
2829         if (flags & OBD_STATFS_NODELAY) {
2830                 /* procfs requests not want stat in wait for avoid deadlock */
2831                 req->rq_no_resend = 1;
2832                 req->rq_no_delay = 1;
2833         }
2834
2835         rc = ptlrpc_queue_wait(req);
2836         if (rc)
2837                 GOTO(out, rc);
2838
2839         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2840         if (msfs == NULL) {
2841                 GOTO(out, rc = -EPROTO);
2842         }
2843
2844         *osfs = *msfs;
2845
2846         EXIT;
2847  out:
2848         ptlrpc_req_finished(req);
2849         return rc;
2850 }
2851
2852 /* Retrieve object striping information.
2853  *
2854  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2855  * the maximum number of OST indices which will fit in the user buffer.
2856  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2857  */
2858 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2859 {
2860         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2861         struct lov_user_md_v3 lum, *lumk;
2862         struct lov_user_ost_data_v1 *lmm_objects;
2863         int rc = 0, lum_size;
2864         ENTRY;
2865
2866         if (!lsm)
2867                 RETURN(-ENODATA);
2868
2869         /* we only need the header part from user space to get lmm_magic and
2870          * lmm_stripe_count, (the header part is common to v1 and v3) */
2871         lum_size = sizeof(struct lov_user_md_v1);
2872         if (cfs_copy_from_user(&lum, lump, lum_size))
2873                 RETURN(-EFAULT);
2874
2875         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2876             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2877                 RETURN(-EINVAL);
2878
2879         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2880         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2881         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2882         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2883
2884         /* we can use lov_mds_md_size() to compute lum_size
2885          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2886         if (lum.lmm_stripe_count > 0) {
2887                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2888                 OBD_ALLOC(lumk, lum_size);
2889                 if (!lumk)
2890                         RETURN(-ENOMEM);
2891
2892                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2893                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2894                 else
2895                         lmm_objects = &(lumk->lmm_objects[0]);
2896                 lmm_objects->l_object_id = lsm->lsm_object_id;
2897         } else {
2898                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2899                 lumk = &lum;
2900         }
2901
2902         lumk->lmm_object_id = lsm->lsm_object_id;
2903         lumk->lmm_object_seq = lsm->lsm_object_seq;
2904         lumk->lmm_stripe_count = 1;
2905
2906         if (cfs_copy_to_user(lump, lumk, lum_size))
2907                 rc = -EFAULT;
2908
2909         if (lumk != &lum)
2910                 OBD_FREE(lumk, lum_size);
2911
2912         RETURN(rc);
2913 }
2914
2915
2916 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2917                          void *karg, void *uarg)
2918 {
2919         struct obd_device *obd = exp->exp_obd;
2920         struct obd_ioctl_data *data = karg;
2921         int err = 0;
2922         ENTRY;
2923
2924         if (!cfs_try_module_get(THIS_MODULE)) {
2925                 CERROR("Can't get module. Is it alive?");
2926                 return -EINVAL;
2927         }
2928         switch (cmd) {
2929         case OBD_IOC_LOV_GET_CONFIG: {
2930                 char *buf;
2931                 struct lov_desc *desc;
2932                 struct obd_uuid uuid;
2933
2934                 buf = NULL;
2935                 len = 0;
2936                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2937                         GOTO(out, err = -EINVAL);
2938
2939                 data = (struct obd_ioctl_data *)buf;
2940
2941                 if (sizeof(*desc) > data->ioc_inllen1) {
2942                         obd_ioctl_freedata(buf, len);
2943                         GOTO(out, err = -EINVAL);
2944                 }
2945
2946                 if (data->ioc_inllen2 < sizeof(uuid)) {
2947                         obd_ioctl_freedata(buf, len);
2948                         GOTO(out, err = -EINVAL);
2949                 }
2950
2951                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2952                 desc->ld_tgt_count = 1;
2953                 desc->ld_active_tgt_count = 1;
2954                 desc->ld_default_stripe_count = 1;
2955                 desc->ld_default_stripe_size = 0;
2956                 desc->ld_default_stripe_offset = 0;
2957                 desc->ld_pattern = 0;
2958                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2959
2960                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2961
2962                 err = cfs_copy_to_user((void *)uarg, buf, len);
2963                 if (err)
2964                         err = -EFAULT;
2965                 obd_ioctl_freedata(buf, len);
2966                 GOTO(out, err);
2967         }
2968         case LL_IOC_LOV_SETSTRIPE:
2969                 err = obd_alloc_memmd(exp, karg);
2970                 if (err > 0)
2971                         err = 0;
2972                 GOTO(out, err);
2973         case LL_IOC_LOV_GETSTRIPE:
2974                 err = osc_getstripe(karg, uarg);
2975                 GOTO(out, err);
2976         case OBD_IOC_CLIENT_RECOVER:
2977                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2978                                             data->ioc_inlbuf1, 0);
2979                 if (err > 0)
2980                         err = 0;
2981                 GOTO(out, err);
2982         case IOC_OSC_SET_ACTIVE:
2983                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2984                                                data->ioc_offset);
2985                 GOTO(out, err);
2986         case OBD_IOC_POLL_QUOTACHECK:
2987                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2988                 GOTO(out, err);
2989         case OBD_IOC_PING_TARGET:
2990                 err = ptlrpc_obd_ping(obd);
2991                 GOTO(out, err);
2992         default:
2993                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2994                        cmd, cfs_curproc_comm());
2995                 GOTO(out, err = -ENOTTY);
2996         }
2997 out:
2998         cfs_module_put(THIS_MODULE);
2999         return err;
3000 }
3001
3002 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3003                         obd_count keylen, void *key, __u32 *vallen, void *val,
3004                         struct lov_stripe_md *lsm)
3005 {
3006         ENTRY;
3007         if (!vallen || !val)
3008                 RETURN(-EFAULT);
3009
3010         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3011                 __u32 *stripe = val;
3012                 *vallen = sizeof(*stripe);
3013                 *stripe = 0;
3014                 RETURN(0);
3015         } else if (KEY_IS(KEY_LAST_ID)) {
3016                 struct ptlrpc_request *req;
3017                 obd_id                *reply;
3018                 char                  *tmp;
3019                 int                    rc;
3020
3021                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3022                                            &RQF_OST_GET_INFO_LAST_ID);
3023                 if (req == NULL)
3024                         RETURN(-ENOMEM);
3025
3026                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3027                                      RCL_CLIENT, keylen);
3028                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3029                 if (rc) {
3030                         ptlrpc_request_free(req);
3031                         RETURN(rc);
3032                 }
3033
3034                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3035                 memcpy(tmp, key, keylen);
3036
3037                 req->rq_no_delay = req->rq_no_resend = 1;
3038                 ptlrpc_request_set_replen(req);
3039                 rc = ptlrpc_queue_wait(req);
3040                 if (rc)
3041                         GOTO(out, rc);
3042
3043                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3044                 if (reply == NULL)
3045                         GOTO(out, rc = -EPROTO);
3046
3047                 *((obd_id *)val) = *reply;
3048         out:
3049                 ptlrpc_req_finished(req);
3050                 RETURN(rc);
3051         } else if (KEY_IS(KEY_FIEMAP)) {
3052                 struct ptlrpc_request *req;
3053                 struct ll_user_fiemap *reply;
3054                 char *tmp;
3055                 int rc;
3056
3057                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3058                                            &RQF_OST_GET_INFO_FIEMAP);
3059                 if (req == NULL)
3060                         RETURN(-ENOMEM);
3061
3062                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3063                                      RCL_CLIENT, keylen);
3064                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3065                                      RCL_CLIENT, *vallen);
3066                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3067                                      RCL_SERVER, *vallen);
3068
3069                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3070                 if (rc) {
3071                         ptlrpc_request_free(req);
3072                         RETURN(rc);
3073                 }
3074
3075                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3076                 memcpy(tmp, key, keylen);
3077                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3078                 memcpy(tmp, val, *vallen);
3079
3080                 ptlrpc_request_set_replen(req);
3081                 rc = ptlrpc_queue_wait(req);
3082                 if (rc)
3083                         GOTO(out1, rc);
3084
3085                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3086                 if (reply == NULL)
3087                         GOTO(out1, rc = -EPROTO);
3088
3089                 memcpy(val, reply, *vallen);
3090         out1:
3091                 ptlrpc_req_finished(req);
3092
3093                 RETURN(rc);
3094         }
3095
3096         RETURN(-EINVAL);
3097 }
3098
3099 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3100                               obd_count keylen, void *key, obd_count vallen,
3101                               void *val, struct ptlrpc_request_set *set)
3102 {
3103         struct ptlrpc_request *req;
3104         struct obd_device     *obd = exp->exp_obd;
3105         struct obd_import     *imp = class_exp2cliimp(exp);
3106         char                  *tmp;
3107         int                    rc;
3108         ENTRY;
3109
3110         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3111
3112         if (KEY_IS(KEY_CHECKSUM)) {
3113                 if (vallen != sizeof(int))
3114                         RETURN(-EINVAL);
3115                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3116                 RETURN(0);
3117         }
3118
3119         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3120                 sptlrpc_conf_client_adapt(obd);
3121                 RETURN(0);
3122         }
3123
3124         if (KEY_IS(KEY_FLUSH_CTX)) {
3125                 sptlrpc_import_flush_my_ctx(imp);
3126                 RETURN(0);
3127         }
3128
3129         if (KEY_IS(KEY_CACHE_SET)) {
3130                 struct client_obd *cli = &obd->u.cli;
3131
3132                 LASSERT(cli->cl_cache == NULL); /* only once */
3133                 cli->cl_cache = (struct cl_client_cache *)val;
3134                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3135                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3136
3137                 /* add this osc into entity list */
3138                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3139                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3140                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3141                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3142
3143                 RETURN(0);
3144         }
3145
3146         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3147                 struct client_obd *cli = &obd->u.cli;
3148                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3149                 int target = *(int *)val;
3150
3151                 nr = osc_lru_shrink(cli, min(nr, target));
3152                 *(int *)val -= nr;
3153                 RETURN(0);
3154         }
3155
3156         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3157                 RETURN(-EINVAL);
3158
3159         /* We pass all other commands directly to OST. Since nobody calls osc
3160            methods directly and everybody is supposed to go through LOV, we
3161            assume lov checked invalid values for us.
3162            The only recognised values so far are evict_by_nid and mds_conn.
3163            Even if something bad goes through, we'd get a -EINVAL from OST
3164            anyway. */
3165
3166         if (KEY_IS(KEY_GRANT_SHRINK))
3167                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3168         else
3169                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3170
3171         if (req == NULL)
3172                 RETURN(-ENOMEM);
3173
3174         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3175                              RCL_CLIENT, keylen);
3176         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3177                              RCL_CLIENT, vallen);
3178         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3179         if (rc) {
3180                 ptlrpc_request_free(req);
3181                 RETURN(rc);
3182         }
3183
3184         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3185         memcpy(tmp, key, keylen);
3186         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3187         memcpy(tmp, val, vallen);
3188
3189         if (KEY_IS(KEY_GRANT_SHRINK)) {
3190                 struct osc_grant_args *aa;
3191                 struct obdo *oa;
3192
3193                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3194                 aa = ptlrpc_req_async_args(req);
3195                 OBDO_ALLOC(oa);
3196                 if (!oa) {
3197                         ptlrpc_req_finished(req);
3198                         RETURN(-ENOMEM);
3199                 }
3200                 *oa = ((struct ost_body *)val)->oa;
3201                 aa->aa_oa = oa;
3202                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3203         }
3204
3205         ptlrpc_request_set_replen(req);
3206         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3207                 LASSERT(set != NULL);
3208                 ptlrpc_set_add_req(set, req);
3209                 ptlrpc_check_set(NULL, set);
3210         } else
3211                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3212
3213         RETURN(0);
3214 }
3215
3216
3217 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3218                          struct obd_device *disk_obd, int *index)
3219 {
3220         /* this code is not supposed to be used with LOD/OSP
3221          * to be removed soon */
3222         LBUG();
3223         return 0;
3224 }
3225
3226 static int osc_llog_finish(struct obd_device *obd, int count)
3227 {
3228         struct llog_ctxt *ctxt;
3229
3230         ENTRY;
3231
3232         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3233         if (ctxt) {
3234                 llog_cat_close(NULL, ctxt->loc_handle);
3235                 llog_cleanup(NULL, ctxt);
3236         }
3237
3238         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3239         if (ctxt)
3240                 llog_cleanup(NULL, ctxt);
3241         RETURN(0);
3242 }
3243
3244 static int osc_reconnect(const struct lu_env *env,
3245                          struct obd_export *exp, struct obd_device *obd,
3246                          struct obd_uuid *cluuid,
3247                          struct obd_connect_data *data,
3248                          void *localdata)
3249 {
3250         struct client_obd *cli = &obd->u.cli;
3251
3252         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3253                 long lost_grant;
3254
3255                 client_obd_list_lock(&cli->cl_loi_list_lock);
3256                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3257                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3258                 lost_grant = cli->cl_lost_grant;
3259                 cli->cl_lost_grant = 0;
3260                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3261
3262                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3263                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3264                        data->ocd_version, data->ocd_grant, lost_grant);
3265         }
3266
3267         RETURN(0);
3268 }
3269
3270 static int osc_disconnect(struct obd_export *exp)
3271 {
3272         struct obd_device *obd = class_exp2obd(exp);
3273         struct llog_ctxt  *ctxt;
3274         int rc;
3275
3276         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3277         if (ctxt) {
3278                 if (obd->u.cli.cl_conn_count == 1) {
3279                         /* Flush any remaining cancel messages out to the
3280                          * target */
3281                         llog_sync(ctxt, exp, 0);
3282                 }
3283                 llog_ctxt_put(ctxt);
3284         } else {
3285                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3286                        obd);
3287         }
3288
3289         rc = client_disconnect_export(exp);
3290         /**
3291          * Initially we put del_shrink_grant before disconnect_export, but it
3292          * causes the following problem if setup (connect) and cleanup
3293          * (disconnect) are tangled together.
3294          *      connect p1                     disconnect p2
3295          *   ptlrpc_connect_import
3296          *     ...............               class_manual_cleanup
3297          *                                     osc_disconnect
3298          *                                     del_shrink_grant
3299          *   ptlrpc_connect_interrupt
3300          *     init_grant_shrink
3301          *   add this client to shrink list
3302          *                                      cleanup_osc
3303          * Bang! pinger trigger the shrink.
3304          * So the osc should be disconnected from the shrink list, after we
3305          * are sure the import has been destroyed. BUG18662
3306          */
3307         if (obd->u.cli.cl_import == NULL)
3308                 osc_del_shrink_grant(&obd->u.cli);
3309         return rc;
3310 }
3311
3312 static int osc_import_event(struct obd_device *obd,
3313                             struct obd_import *imp,
3314                             enum obd_import_event event)
3315 {
3316         struct client_obd *cli;
3317         int rc = 0;
3318
3319         ENTRY;
3320         LASSERT(imp->imp_obd == obd);
3321
3322         switch (event) {
3323         case IMP_EVENT_DISCON: {
3324                 cli = &obd->u.cli;
3325                 client_obd_list_lock(&cli->cl_loi_list_lock);
3326                 cli->cl_avail_grant = 0;
3327                 cli->cl_lost_grant = 0;
3328                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3329                 break;
3330         }
3331         case IMP_EVENT_INACTIVE: {
3332                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3333                 break;
3334         }
3335         case IMP_EVENT_INVALIDATE: {
3336                 struct ldlm_namespace *ns = obd->obd_namespace;
3337                 struct lu_env         *env;
3338                 int                    refcheck;
3339
3340                 env = cl_env_get(&refcheck);
3341                 if (!IS_ERR(env)) {
3342                         /* Reset grants */
3343                         cli = &obd->u.cli;
3344                         /* all pages go to failing rpcs due to the invalid
3345                          * import */
3346                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3347
3348                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3349                         cl_env_put(env, &refcheck);
3350                 } else
3351                         rc = PTR_ERR(env);
3352                 break;
3353         }
3354         case IMP_EVENT_ACTIVE: {
3355                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3356                 break;
3357         }
3358         case IMP_EVENT_OCD: {
3359                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3360
3361                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3362                         osc_init_grant(&obd->u.cli, ocd);
3363
3364                 /* See bug 7198 */
3365                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3366                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3367
3368                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3369                 break;
3370         }
3371         case IMP_EVENT_DEACTIVATE: {
3372                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3373                 break;
3374         }
3375         case IMP_EVENT_ACTIVATE: {
3376                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3377                 break;
3378         }
3379         default:
3380                 CERROR("Unknown import event %d\n", event);
3381                 LBUG();
3382         }
3383         RETURN(rc);
3384 }
3385
3386 /**
3387  * Determine whether the lock can be canceled before replaying the lock
3388  * during recovery, see bug16774 for detailed information.
3389  *
3390  * \retval zero the lock can't be canceled
3391  * \retval other ok to cancel
3392  */
3393 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3394 {
3395         check_res_locked(lock->l_resource);
3396
3397         /*
3398          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3399          *
3400          * XXX as a future improvement, we can also cancel unused write lock
3401          * if it doesn't have dirty data and active mmaps.
3402          */
3403         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3404             (lock->l_granted_mode == LCK_PR ||
3405              lock->l_granted_mode == LCK_CR) &&
3406             (osc_dlm_lock_pageref(lock) == 0))
3407                 RETURN(1);
3408
3409         RETURN(0);
3410 }
3411
3412 static int brw_queue_work(const struct lu_env *env, void *data)
3413 {
3414         struct client_obd *cli = data;
3415
3416         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3417
3418         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3419         RETURN(0);
3420 }
3421
3422 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3423 {
3424         struct lprocfs_static_vars lvars = { 0 };
3425         struct client_obd          *cli = &obd->u.cli;
3426         void                       *handler;
3427         int                        rc;
3428         ENTRY;
3429
3430         rc = ptlrpcd_addref();
3431         if (rc)
3432                 RETURN(rc);
3433
3434         rc = client_obd_setup(obd, lcfg);
3435         if (rc)
3436                 GOTO(out_ptlrpcd, rc);
3437
3438         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3439         if (IS_ERR(handler))
3440                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3441         cli->cl_writeback_work = handler;
3442
3443         rc = osc_quota_setup(obd);
3444         if (rc)
3445                 GOTO(out_ptlrpcd_work, rc);
3446
3447         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3448         lprocfs_osc_init_vars(&lvars);
3449         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3450                 lproc_osc_attach_seqstat(obd);
3451                 sptlrpc_lprocfs_cliobd_attach(obd);
3452                 ptlrpc_lprocfs_register_obd(obd);
3453         }
3454
3455         /* We need to allocate a few requests more, because
3456          * brw_interpret tries to create new requests before freeing
3457          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3458          * reserved, but I'm afraid that might be too much wasted RAM
3459          * in fact, so 2 is just my guess and still should work. */
3460         cli->cl_import->imp_rq_pool =
3461                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3462                                     OST_MAXREQSIZE,
3463                                     ptlrpc_add_rqs_to_pool);
3464
3465         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3466         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3467         RETURN(rc);
3468
3469 out_ptlrpcd_work:
3470         ptlrpcd_destroy_work(handler);
3471 out_client_setup:
3472         client_obd_cleanup(obd);
3473 out_ptlrpcd:
3474         ptlrpcd_decref();
3475         RETURN(rc);
3476 }
3477
3478 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3479 {
3480         int rc = 0;
3481         ENTRY;
3482
3483         switch (stage) {
3484         case OBD_CLEANUP_EARLY: {
3485                 struct obd_import *imp;
3486                 imp = obd->u.cli.cl_import;
3487                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3488                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3489                 ptlrpc_deactivate_import(imp);
3490                 spin_lock(&imp->imp_lock);
3491                 imp->imp_pingable = 0;
3492                 spin_unlock(&imp->imp_lock);
3493                 break;
3494         }
3495         case OBD_CLEANUP_EXPORTS: {
3496                 struct client_obd *cli = &obd->u.cli;
3497                 /* LU-464
3498                  * for echo client, export may be on zombie list, wait for
3499                  * zombie thread to cull it, because cli.cl_import will be
3500                  * cleared in client_disconnect_export():
3501                  *   class_export_destroy() -> obd_cleanup() ->
3502                  *   echo_device_free() -> echo_client_cleanup() ->
3503                  *   obd_disconnect() -> osc_disconnect() ->
3504                  *   client_disconnect_export()
3505                  */
3506                 obd_zombie_barrier();
3507                 if (cli->cl_writeback_work) {
3508                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3509                         cli->cl_writeback_work = NULL;
3510                 }
3511                 obd_cleanup_client_import(obd);
3512                 ptlrpc_lprocfs_unregister_obd(obd);
3513                 lprocfs_obd_cleanup(obd);
3514                 rc = obd_llog_finish(obd, 0);
3515                 if (rc != 0)
3516                         CERROR("failed to cleanup llogging subsystems\n");
3517                 break;
3518                 }
3519         }
3520         RETURN(rc);
3521 }
3522
3523 int osc_cleanup(struct obd_device *obd)
3524 {
3525         struct client_obd *cli = &obd->u.cli;
3526         int rc;
3527
3528         ENTRY;
3529
3530         /* lru cleanup */
3531         if (cli->cl_cache != NULL) {
3532                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3533                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3534                 cfs_list_del_init(&cli->cl_lru_osc);
3535                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3536                 cli->cl_lru_left = NULL;
3537                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3538                 cli->cl_cache = NULL;
3539         }
3540
3541         /* free memory of osc quota cache */
3542         osc_quota_cleanup(obd);
3543
3544         rc = client_obd_cleanup(obd);
3545
3546         ptlrpcd_decref();
3547         RETURN(rc);
3548 }
3549
3550 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3551 {
3552         struct lprocfs_static_vars lvars = { 0 };
3553         int rc = 0;
3554
3555         lprocfs_osc_init_vars(&lvars);
3556
3557         switch (lcfg->lcfg_command) {
3558         default:
3559                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3560                                               lcfg, obd);
3561                 if (rc > 0)
3562                         rc = 0;
3563                 break;
3564         }
3565
3566         return(rc);
3567 }
3568
3569 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3570 {
3571         return osc_process_config_base(obd, buf);
3572 }
3573
3574 struct obd_ops osc_obd_ops = {
3575         .o_owner                = THIS_MODULE,
3576         .o_setup                = osc_setup,
3577         .o_precleanup           = osc_precleanup,
3578         .o_cleanup              = osc_cleanup,
3579         .o_add_conn             = client_import_add_conn,
3580         .o_del_conn             = client_import_del_conn,
3581         .o_connect              = client_connect_import,
3582         .o_reconnect            = osc_reconnect,
3583         .o_disconnect           = osc_disconnect,
3584         .o_statfs               = osc_statfs,
3585         .o_statfs_async         = osc_statfs_async,
3586         .o_packmd               = osc_packmd,
3587         .o_unpackmd             = osc_unpackmd,
3588         .o_create               = osc_create,
3589         .o_destroy              = osc_destroy,
3590         .o_getattr              = osc_getattr,
3591         .o_getattr_async        = osc_getattr_async,
3592         .o_setattr              = osc_setattr,
3593         .o_setattr_async        = osc_setattr_async,
3594         .o_brw                  = osc_brw,
3595         .o_punch                = osc_punch,
3596         .o_sync                 = osc_sync,
3597         .o_enqueue              = osc_enqueue,
3598         .o_change_cbdata        = osc_change_cbdata,
3599         .o_find_cbdata          = osc_find_cbdata,
3600         .o_cancel               = osc_cancel,
3601         .o_cancel_unused        = osc_cancel_unused,
3602         .o_iocontrol            = osc_iocontrol,
3603         .o_get_info             = osc_get_info,
3604         .o_set_info_async       = osc_set_info_async,
3605         .o_import_event         = osc_import_event,
3606         .o_llog_init            = osc_llog_init,
3607         .o_llog_finish          = osc_llog_finish,
3608         .o_process_config       = osc_process_config,
3609         .o_quotactl             = osc_quotactl,
3610         .o_quotacheck           = osc_quotacheck,
3611 };
3612
3613 extern struct lu_kmem_descr osc_caches[];
3614 extern spinlock_t osc_ast_guard;
3615 extern struct lock_class_key osc_ast_guard_class;
3616
3617 int __init osc_init(void)
3618 {
3619         struct lprocfs_static_vars lvars = { 0 };
3620         int rc;
3621         ENTRY;
3622
3623         /* print an address of _any_ initialized kernel symbol from this
3624          * module, to allow debugging with gdb that doesn't support data
3625          * symbols from modules.*/
3626         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3627
3628         rc = lu_kmem_init(osc_caches);
3629
3630         lprocfs_osc_init_vars(&lvars);
3631
3632         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3633                                  LUSTRE_OSC_NAME, &osc_device_type);
3634         if (rc) {
3635                 lu_kmem_fini(osc_caches);
3636                 RETURN(rc);
3637         }
3638
3639         spin_lock_init(&osc_ast_guard);
3640         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3641
3642         RETURN(rc);
3643 }
3644
3645 #ifdef __KERNEL__
3646 static void /*__exit*/ osc_exit(void)
3647 {
3648         class_unregister_type(LUSTRE_OSC_NAME);
3649         lu_kmem_fini(osc_caches);
3650 }
3651
3652 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3653 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3654 MODULE_LICENSE("GPL");
3655
3656 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3657 #endif