Whamcloud - gitweb
LU-2139 osc: Move cl_client_lru to cl_client_cache
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_fsync_args *fa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *fa->fa_oi->oi_oa = body->oa;
585 out:
586         rc = fa->fa_upcall(fa->fa_cookie, rc);
587         RETURN(rc);
588 }
589
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591                   obd_enqueue_update_f upcall, void *cookie,
592                   struct ptlrpc_request_set *rqset)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_fsync_args *fa;
597         int                    rc;
598         ENTRY;
599
600         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601         if (req == NULL)
602                 RETURN(-ENOMEM);
603
604         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         /* overload the size and blocks fields in the oa with start/end */
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615         osc_pack_capa(req, body, oinfo->oi_capa);
616
617         ptlrpc_request_set_replen(req);
618         req->rq_interpret_reply = osc_sync_interpret;
619
620         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621         fa = ptlrpc_req_async_args(req);
622         fa->fa_oi = oinfo;
623         fa->fa_upcall = upcall;
624         fa->fa_cookie = cookie;
625
626         if (rqset == PTLRPCD_SET)
627                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628         else
629                 ptlrpc_set_add_req(rqset, req);
630
631         RETURN (0);
632 }
633
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635                     struct obd_info *oinfo, obd_size start, obd_size end,
636                     struct ptlrpc_request_set *set)
637 {
638         ENTRY;
639
640         if (!oinfo->oi_oa) {
641                 CDEBUG(D_INFO, "oa NULL\n");
642                 RETURN(-EINVAL);
643         }
644
645         oinfo->oi_oa->o_size = start;
646         oinfo->oi_oa->o_blocks = end;
647         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648
649         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
650 }
651
652 /* Find and cancel locally locks matched by @mode in the resource found by
653  * @objid. Found locks are added into @cancel list. Returns the amount of
654  * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656                                    cfs_list_t *cancels,
657                                    ldlm_mode_t mode, int lock_flags)
658 {
659         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660         struct ldlm_res_id res_id;
661         struct ldlm_resource *res;
662         int count;
663         ENTRY;
664
665         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
666         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
667         if (res == NULL)
668                 RETURN(0);
669
670         LDLM_RESOURCE_ADDREF(res);
671         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
672                                            lock_flags, 0, NULL);
673         LDLM_RESOURCE_DELREF(res);
674         ldlm_resource_putref(res);
675         RETURN(count);
676 }
677
678 static int osc_destroy_interpret(const struct lu_env *env,
679                                  struct ptlrpc_request *req, void *data,
680                                  int rc)
681 {
682         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
683
684         cfs_atomic_dec(&cli->cl_destroy_in_flight);
685         cfs_waitq_signal(&cli->cl_destroy_waitq);
686         return 0;
687 }
688
689 static int osc_can_send_destroy(struct client_obd *cli)
690 {
691         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
692             cli->cl_max_rpcs_in_flight) {
693                 /* The destroy request can be sent */
694                 return 1;
695         }
696         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
697             cli->cl_max_rpcs_in_flight) {
698                 /*
699                  * The counter has been modified between the two atomic
700                  * operations.
701                  */
702                 cfs_waitq_signal(&cli->cl_destroy_waitq);
703         }
704         return 0;
705 }
706
707 int osc_create(const struct lu_env *env, struct obd_export *exp,
708                struct obdo *oa, struct lov_stripe_md **ea,
709                struct obd_trans_info *oti)
710 {
711         int rc = 0;
712         ENTRY;
713
714         LASSERT(oa);
715         LASSERT(ea);
716         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
717
718         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
719             oa->o_flags == OBD_FL_RECREATE_OBJS) {
720                 RETURN(osc_real_create(exp, oa, ea, oti));
721         }
722
723         if (!fid_seq_is_mdt(oa->o_seq))
724                 RETURN(osc_real_create(exp, oa, ea, oti));
725
726         /* we should not get here anymore */
727         LBUG();
728
729         RETURN(rc);
730 }
731
732 /* Destroy requests can be async always on the client, and we don't even really
733  * care about the return code since the client cannot do anything at all about
734  * a destroy failure.
735  * When the MDS is unlinking a filename, it saves the file objects into a
736  * recovery llog, and these object records are cancelled when the OST reports
737  * they were destroyed and sync'd to disk (i.e. transaction committed).
738  * If the client dies, or the OST is down when the object should be destroyed,
739  * the records are not cancelled, and when the OST reconnects to the MDS next,
740  * it will retrieve the llog unlink logs and then sends the log cancellation
741  * cookies to the MDS after committing destroy transactions. */
742 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
743                        struct obdo *oa, struct lov_stripe_md *ea,
744                        struct obd_trans_info *oti, struct obd_export *md_export,
745                        void *capa)
746 {
747         struct client_obd     *cli = &exp->exp_obd->u.cli;
748         struct ptlrpc_request *req;
749         struct ost_body       *body;
750         CFS_LIST_HEAD(cancels);
751         int rc, count;
752         ENTRY;
753
754         if (!oa) {
755                 CDEBUG(D_INFO, "oa NULL\n");
756                 RETURN(-EINVAL);
757         }
758
759         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
760                                         LDLM_FL_DISCARD_DATA);
761
762         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
763         if (req == NULL) {
764                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
765                 RETURN(-ENOMEM);
766         }
767
768         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
769         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
770                                0, &cancels, count);
771         if (rc) {
772                 ptlrpc_request_free(req);
773                 RETURN(rc);
774         }
775
776         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
777         ptlrpc_at_set_req_timeout(req);
778
779         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
780                 oa->o_lcookie = *oti->oti_logcookies;
781         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
782         LASSERT(body);
783         lustre_set_wire_obdo(&body->oa, oa);
784
785         osc_pack_capa(req, body, (struct obd_capa *)capa);
786         ptlrpc_request_set_replen(req);
787
788         /* If osc_destory is for destroying the unlink orphan,
789          * sent from MDT to OST, which should not be blocked here,
790          * because the process might be triggered by ptlrpcd, and
791          * it is not good to block ptlrpcd thread (b=16006)*/
792         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
793                 req->rq_interpret_reply = osc_destroy_interpret;
794                 if (!osc_can_send_destroy(cli)) {
795                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
796                                                           NULL);
797
798                         /*
799                          * Wait until the number of on-going destroy RPCs drops
800                          * under max_rpc_in_flight
801                          */
802                         l_wait_event_exclusive(cli->cl_destroy_waitq,
803                                                osc_can_send_destroy(cli), &lwi);
804                 }
805         }
806
807         /* Do not wait for response */
808         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
809         RETURN(0);
810 }
811
812 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
813                                 long writing_bytes)
814 {
815         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
816
817         LASSERT(!(oa->o_valid & bits));
818
819         oa->o_valid |= bits;
820         client_obd_list_lock(&cli->cl_loi_list_lock);
821         oa->o_dirty = cli->cl_dirty;
822         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
823                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
824                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
825                 oa->o_undirty = 0;
826         } else if (cfs_atomic_read(&obd_dirty_pages) -
827                    cfs_atomic_read(&obd_dirty_transit_pages) >
828                    obd_max_dirty_pages + 1){
829                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
830                  * not covered by a lock thus they may safely race and trip
831                  * this CERROR() unless we add in a small fudge factor (+1). */
832                 CERROR("dirty %d - %d > system dirty_max %d\n",
833                        cfs_atomic_read(&obd_dirty_pages),
834                        cfs_atomic_read(&obd_dirty_transit_pages),
835                        obd_max_dirty_pages);
836                 oa->o_undirty = 0;
837         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
838                 CERROR("dirty %lu - dirty_max %lu too big???\n",
839                        cli->cl_dirty, cli->cl_dirty_max);
840                 oa->o_undirty = 0;
841         } else {
842                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
843                                 (cli->cl_max_rpcs_in_flight + 1);
844                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
845         }
846         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
847         oa->o_dropped = cli->cl_lost_grant;
848         cli->cl_lost_grant = 0;
849         client_obd_list_unlock(&cli->cl_loi_list_lock);
850         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
851                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
852
853 }
854
855 void osc_update_next_shrink(struct client_obd *cli)
856 {
857         cli->cl_next_shrink_grant =
858                 cfs_time_shift(cli->cl_grant_shrink_interval);
859         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
860                cli->cl_next_shrink_grant);
861 }
862
863 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
864 {
865         client_obd_list_lock(&cli->cl_loi_list_lock);
866         cli->cl_avail_grant += grant;
867         client_obd_list_unlock(&cli->cl_loi_list_lock);
868 }
869
870 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
871 {
872         if (body->oa.o_valid & OBD_MD_FLGRANT) {
873                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
874                 __osc_update_grant(cli, body->oa.o_grant);
875         }
876 }
877
878 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
879                               obd_count keylen, void *key, obd_count vallen,
880                               void *val, struct ptlrpc_request_set *set);
881
882 static int osc_shrink_grant_interpret(const struct lu_env *env,
883                                       struct ptlrpc_request *req,
884                                       void *aa, int rc)
885 {
886         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
887         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
888         struct ost_body *body;
889
890         if (rc != 0) {
891                 __osc_update_grant(cli, oa->o_grant);
892                 GOTO(out, rc);
893         }
894
895         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
896         LASSERT(body);
897         osc_update_grant(cli, body);
898 out:
899         OBDO_FREE(oa);
900         return rc;
901 }
902
903 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
904 {
905         client_obd_list_lock(&cli->cl_loi_list_lock);
906         oa->o_grant = cli->cl_avail_grant / 4;
907         cli->cl_avail_grant -= oa->o_grant;
908         client_obd_list_unlock(&cli->cl_loi_list_lock);
909         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
910                 oa->o_valid |= OBD_MD_FLFLAGS;
911                 oa->o_flags = 0;
912         }
913         oa->o_flags |= OBD_FL_SHRINK_GRANT;
914         osc_update_next_shrink(cli);
915 }
916
917 /* Shrink the current grant, either from some large amount to enough for a
918  * full set of in-flight RPCs, or if we have already shrunk to that limit
919  * then to enough for a single RPC.  This avoids keeping more grant than
920  * needed, and avoids shrinking the grant piecemeal. */
921 static int osc_shrink_grant(struct client_obd *cli)
922 {
923         long target = (cli->cl_max_rpcs_in_flight + 1) *
924                       cli->cl_max_pages_per_rpc;
925
926         client_obd_list_lock(&cli->cl_loi_list_lock);
927         if (cli->cl_avail_grant <= target)
928                 target = cli->cl_max_pages_per_rpc;
929         client_obd_list_unlock(&cli->cl_loi_list_lock);
930
931         return osc_shrink_grant_to_target(cli, target);
932 }
933
934 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
935 {
936         int    rc = 0;
937         struct ost_body     *body;
938         ENTRY;
939
940         client_obd_list_lock(&cli->cl_loi_list_lock);
941         /* Don't shrink if we are already above or below the desired limit
942          * We don't want to shrink below a single RPC, as that will negatively
943          * impact block allocation and long-term performance. */
944         if (target < cli->cl_max_pages_per_rpc)
945                 target = cli->cl_max_pages_per_rpc;
946
947         if (target >= cli->cl_avail_grant) {
948                 client_obd_list_unlock(&cli->cl_loi_list_lock);
949                 RETURN(0);
950         }
951         client_obd_list_unlock(&cli->cl_loi_list_lock);
952
953         OBD_ALLOC_PTR(body);
954         if (!body)
955                 RETURN(-ENOMEM);
956
957         osc_announce_cached(cli, &body->oa, 0);
958
959         client_obd_list_lock(&cli->cl_loi_list_lock);
960         body->oa.o_grant = cli->cl_avail_grant - target;
961         cli->cl_avail_grant = target;
962         client_obd_list_unlock(&cli->cl_loi_list_lock);
963         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
964                 body->oa.o_valid |= OBD_MD_FLFLAGS;
965                 body->oa.o_flags = 0;
966         }
967         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
968         osc_update_next_shrink(cli);
969
970         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
971                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
972                                 sizeof(*body), body, NULL);
973         if (rc != 0)
974                 __osc_update_grant(cli, body->oa.o_grant);
975         OBD_FREE_PTR(body);
976         RETURN(rc);
977 }
978
979 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
980 static int osc_should_shrink_grant(struct client_obd *client)
981 {
982         cfs_time_t time = cfs_time_current();
983         cfs_time_t next_shrink = client->cl_next_shrink_grant;
984
985         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
986              OBD_CONNECT_GRANT_SHRINK) == 0)
987                 return 0;
988
989         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
990                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
991                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
992                         return 1;
993                 else
994                         osc_update_next_shrink(client);
995         }
996         return 0;
997 }
998
999 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1000 {
1001         struct client_obd *client;
1002
1003         cfs_list_for_each_entry(client, &item->ti_obd_list,
1004                                 cl_grant_shrink_list) {
1005                 if (osc_should_shrink_grant(client))
1006                         osc_shrink_grant(client);
1007         }
1008         return 0;
1009 }
1010
1011 static int osc_add_shrink_grant(struct client_obd *client)
1012 {
1013         int rc;
1014
1015         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1016                                        TIMEOUT_GRANT,
1017                                        osc_grant_shrink_grant_cb, NULL,
1018                                        &client->cl_grant_shrink_list);
1019         if (rc) {
1020                 CERROR("add grant client %s error %d\n",
1021                         client->cl_import->imp_obd->obd_name, rc);
1022                 return rc;
1023         }
1024         CDEBUG(D_CACHE, "add grant client %s \n",
1025                client->cl_import->imp_obd->obd_name);
1026         osc_update_next_shrink(client);
1027         return 0;
1028 }
1029
1030 static int osc_del_shrink_grant(struct client_obd *client)
1031 {
1032         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1033                                          TIMEOUT_GRANT);
1034 }
1035
1036 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1037 {
1038         /*
1039          * ocd_grant is the total grant amount we're expect to hold: if we've
1040          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1041          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1042          *
1043          * race is tolerable here: if we're evicted, but imp_state already
1044          * left EVICTED state, then cl_dirty must be 0 already.
1045          */
1046         client_obd_list_lock(&cli->cl_loi_list_lock);
1047         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1048                 cli->cl_avail_grant = ocd->ocd_grant;
1049         else
1050                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1051
1052         if (cli->cl_avail_grant < 0) {
1053                 CWARN("%s: available grant < 0, the OSS is probably not running"
1054                       " with patch from bug20278 (%ld) \n",
1055                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1056                 /* workaround for 1.6 servers which do not have
1057                  * the patch from bug20278 */
1058                 cli->cl_avail_grant = ocd->ocd_grant;
1059         }
1060
1061         /* determine the appropriate chunk size used by osc_extent. */
1062         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1063         client_obd_list_unlock(&cli->cl_loi_list_lock);
1064
1065         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1066                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1067                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1068
1069         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1070             cfs_list_empty(&cli->cl_grant_shrink_list))
1071                 osc_add_shrink_grant(cli);
1072 }
1073
1074 /* We assume that the reason this OSC got a short read is because it read
1075  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1076  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1077  * this stripe never got written at or beyond this stripe offset yet. */
1078 static void handle_short_read(int nob_read, obd_count page_count,
1079                               struct brw_page **pga)
1080 {
1081         char *ptr;
1082         int i = 0;
1083
1084         /* skip bytes read OK */
1085         while (nob_read > 0) {
1086                 LASSERT (page_count > 0);
1087
1088                 if (pga[i]->count > nob_read) {
1089                         /* EOF inside this page */
1090                         ptr = cfs_kmap(pga[i]->pg) +
1091                                 (pga[i]->off & ~CFS_PAGE_MASK);
1092                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1093                         cfs_kunmap(pga[i]->pg);
1094                         page_count--;
1095                         i++;
1096                         break;
1097                 }
1098
1099                 nob_read -= pga[i]->count;
1100                 page_count--;
1101                 i++;
1102         }
1103
1104         /* zero remaining pages */
1105         while (page_count-- > 0) {
1106                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1107                 memset(ptr, 0, pga[i]->count);
1108                 cfs_kunmap(pga[i]->pg);
1109                 i++;
1110         }
1111 }
1112
1113 static int check_write_rcs(struct ptlrpc_request *req,
1114                            int requested_nob, int niocount,
1115                            obd_count page_count, struct brw_page **pga)
1116 {
1117         int     i;
1118         __u32   *remote_rcs;
1119
1120         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1121                                                   sizeof(*remote_rcs) *
1122                                                   niocount);
1123         if (remote_rcs == NULL) {
1124                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1125                 return(-EPROTO);
1126         }
1127
1128         /* return error if any niobuf was in error */
1129         for (i = 0; i < niocount; i++) {
1130                 if ((int)remote_rcs[i] < 0)
1131                         return(remote_rcs[i]);
1132
1133                 if (remote_rcs[i] != 0) {
1134                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1135                                 i, remote_rcs[i], req);
1136                         return(-EPROTO);
1137                 }
1138         }
1139
1140         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1141                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1142                        req->rq_bulk->bd_nob_transferred, requested_nob);
1143                 return(-EPROTO);
1144         }
1145
1146         return (0);
1147 }
1148
1149 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1150 {
1151         if (p1->flag != p2->flag) {
1152                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1153                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1154
1155                 /* warn if we try to combine flags that we don't know to be
1156                  * safe to combine */
1157                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1158                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1159                               "report this at http://bugs.whamcloud.com/\n",
1160                               p1->flag, p2->flag);
1161                 }
1162                 return 0;
1163         }
1164
1165         return (p1->off + p1->count == p2->off);
1166 }
1167
1168 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1169                                    struct brw_page **pga, int opc,
1170                                    cksum_type_t cksum_type)
1171 {
1172         __u32                           cksum;
1173         int                             i = 0;
1174         struct cfs_crypto_hash_desc     *hdesc;
1175         unsigned int                    bufsize;
1176         int                             err;
1177         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1178
1179         LASSERT(pg_count > 0);
1180
1181         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1182         if (IS_ERR(hdesc)) {
1183                 CERROR("Unable to initialize checksum hash %s\n",
1184                        cfs_crypto_hash_name(cfs_alg));
1185                 return PTR_ERR(hdesc);
1186         }
1187
1188         while (nob > 0 && pg_count > 0) {
1189                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1190
1191                 /* corrupt the data before we compute the checksum, to
1192                  * simulate an OST->client data error */
1193                 if (i == 0 && opc == OST_READ &&
1194                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1195                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1196                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1197                         memcpy(ptr + off, "bad1", min(4, nob));
1198                         cfs_kunmap(pga[i]->pg);
1199                 }
1200                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1201                                   pga[i]->off & ~CFS_PAGE_MASK,
1202                                   count);
1203                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1204                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1205
1206                 nob -= pga[i]->count;
1207                 pg_count--;
1208                 i++;
1209         }
1210
1211         bufsize = 4;
1212         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1213
1214         if (err)
1215                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1216
1217         /* For sending we only compute the wrong checksum instead
1218          * of corrupting the data so it is still correct on a redo */
1219         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1220                 cksum++;
1221
1222         return cksum;
1223 }
1224
1225 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1226                                 struct lov_stripe_md *lsm, obd_count page_count,
1227                                 struct brw_page **pga,
1228                                 struct ptlrpc_request **reqp,
1229                                 struct obd_capa *ocapa, int reserve,
1230                                 int resend)
1231 {
1232         struct ptlrpc_request   *req;
1233         struct ptlrpc_bulk_desc *desc;
1234         struct ost_body         *body;
1235         struct obd_ioobj        *ioobj;
1236         struct niobuf_remote    *niobuf;
1237         int niocount, i, requested_nob, opc, rc;
1238         struct osc_brw_async_args *aa;
1239         struct req_capsule      *pill;
1240         struct brw_page *pg_prev;
1241
1242         ENTRY;
1243         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1244                 RETURN(-ENOMEM); /* Recoverable */
1245         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1246                 RETURN(-EINVAL); /* Fatal */
1247
1248         if ((cmd & OBD_BRW_WRITE) != 0) {
1249                 opc = OST_WRITE;
1250                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1251                                                 cli->cl_import->imp_rq_pool,
1252                                                 &RQF_OST_BRW_WRITE);
1253         } else {
1254                 opc = OST_READ;
1255                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1256         }
1257         if (req == NULL)
1258                 RETURN(-ENOMEM);
1259
1260         for (niocount = i = 1; i < page_count; i++) {
1261                 if (!can_merge_pages(pga[i - 1], pga[i]))
1262                         niocount++;
1263         }
1264
1265         pill = &req->rq_pill;
1266         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1267                              sizeof(*ioobj));
1268         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1269                              niocount * sizeof(*niobuf));
1270         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1271
1272         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1273         if (rc) {
1274                 ptlrpc_request_free(req);
1275                 RETURN(rc);
1276         }
1277         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1278         ptlrpc_at_set_req_timeout(req);
1279         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1280          * retry logic */
1281         req->rq_no_retry_einprogress = 1;
1282
1283         if (opc == OST_WRITE)
1284                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1285                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1286         else
1287                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1288                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1289
1290         if (desc == NULL)
1291                 GOTO(out, rc = -ENOMEM);
1292         /* NB request now owns desc and will free it when it gets freed */
1293
1294         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1295         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1296         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1297         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1298
1299         lustre_set_wire_obdo(&body->oa, oa);
1300
1301         obdo_to_ioobj(oa, ioobj);
1302         ioobj->ioo_bufcnt = niocount;
1303         osc_pack_capa(req, body, ocapa);
1304         LASSERT (page_count > 0);
1305         pg_prev = pga[0];
1306         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1307                 struct brw_page *pg = pga[i];
1308                 int poff = pg->off & ~CFS_PAGE_MASK;
1309
1310                 LASSERT(pg->count > 0);
1311                 /* make sure there is no gap in the middle of page array */
1312                 LASSERTF(page_count == 1 ||
1313                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1314                           ergo(i > 0 && i < page_count - 1,
1315                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1316                           ergo(i == page_count - 1, poff == 0)),
1317                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1318                          i, page_count, pg, pg->off, pg->count);
1319 #ifdef __linux__
1320                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1321                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1322                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1323                          i, page_count,
1324                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1325                          pg_prev->pg, page_private(pg_prev->pg),
1326                          pg_prev->pg->index, pg_prev->off);
1327 #else
1328                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1329                          "i %d p_c %u\n", i, page_count);
1330 #endif
1331                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1332                         (pg->flag & OBD_BRW_SRVLOCK));
1333
1334                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1335                 requested_nob += pg->count;
1336
1337                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1338                         niobuf--;
1339                         niobuf->len += pg->count;
1340                 } else {
1341                         niobuf->offset = pg->off;
1342                         niobuf->len    = pg->count;
1343                         niobuf->flags  = pg->flag;
1344                 }
1345                 pg_prev = pg;
1346         }
1347
1348         LASSERTF((void *)(niobuf - niocount) ==
1349                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1350                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1351                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1352
1353         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1354         if (resend) {
1355                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1356                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1357                         body->oa.o_flags = 0;
1358                 }
1359                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1360         }
1361
1362         if (osc_should_shrink_grant(cli))
1363                 osc_shrink_grant_local(cli, &body->oa);
1364
1365         /* size[REQ_REC_OFF] still sizeof (*body) */
1366         if (opc == OST_WRITE) {
1367                 if (cli->cl_checksum &&
1368                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1369                         /* store cl_cksum_type in a local variable since
1370                          * it can be changed via lprocfs */
1371                         cksum_type_t cksum_type = cli->cl_cksum_type;
1372
1373                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1374                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1375                                 body->oa.o_flags = 0;
1376                         }
1377                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1378                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1379                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1380                                                              page_count, pga,
1381                                                              OST_WRITE,
1382                                                              cksum_type);
1383                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1384                                body->oa.o_cksum);
1385                         /* save this in 'oa', too, for later checking */
1386                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1387                         oa->o_flags |= cksum_type_pack(cksum_type);
1388                 } else {
1389                         /* clear out the checksum flag, in case this is a
1390                          * resend but cl_checksum is no longer set. b=11238 */
1391                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1392                 }
1393                 oa->o_cksum = body->oa.o_cksum;
1394                 /* 1 RC per niobuf */
1395                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1396                                      sizeof(__u32) * niocount);
1397         } else {
1398                 if (cli->cl_checksum &&
1399                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1400                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1401                                 body->oa.o_flags = 0;
1402                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1403                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1404                 }
1405         }
1406         ptlrpc_request_set_replen(req);
1407
1408         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1409         aa = ptlrpc_req_async_args(req);
1410         aa->aa_oa = oa;
1411         aa->aa_requested_nob = requested_nob;
1412         aa->aa_nio_count = niocount;
1413         aa->aa_page_count = page_count;
1414         aa->aa_resends = 0;
1415         aa->aa_ppga = pga;
1416         aa->aa_cli = cli;
1417         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1418         if (ocapa && reserve)
1419                 aa->aa_ocapa = capa_get(ocapa);
1420
1421         *reqp = req;
1422         RETURN(0);
1423
1424  out:
1425         ptlrpc_req_finished(req);
1426         RETURN(rc);
1427 }
1428
1429 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1430                                 __u32 client_cksum, __u32 server_cksum, int nob,
1431                                 obd_count page_count, struct brw_page **pga,
1432                                 cksum_type_t client_cksum_type)
1433 {
1434         __u32 new_cksum;
1435         char *msg;
1436         cksum_type_t cksum_type;
1437
1438         if (server_cksum == client_cksum) {
1439                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1440                 return 0;
1441         }
1442
1443         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1444                                        oa->o_flags : 0);
1445         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1446                                       cksum_type);
1447
1448         if (cksum_type != client_cksum_type)
1449                 msg = "the server did not use the checksum type specified in "
1450                       "the original request - likely a protocol problem";
1451         else if (new_cksum == server_cksum)
1452                 msg = "changed on the client after we checksummed it - "
1453                       "likely false positive due to mmap IO (bug 11742)";
1454         else if (new_cksum == client_cksum)
1455                 msg = "changed in transit before arrival at OST";
1456         else
1457                 msg = "changed in transit AND doesn't match the original - "
1458                       "likely false positive due to mmap IO (bug 11742)";
1459
1460         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1461                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1462                            msg, libcfs_nid2str(peer->nid),
1463                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1464                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1465                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1466                            oa->o_id,
1467                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1468                            pga[0]->off,
1469                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1470         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1471                "client csum now %x\n", client_cksum, client_cksum_type,
1472                server_cksum, cksum_type, new_cksum);
1473         return 1;
1474 }
1475
1476 /* Note rc enters this function as number of bytes transferred */
1477 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1478 {
1479         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1480         const lnet_process_id_t *peer =
1481                         &req->rq_import->imp_connection->c_peer;
1482         struct client_obd *cli = aa->aa_cli;
1483         struct ost_body *body;
1484         __u32 client_cksum = 0;
1485         ENTRY;
1486
1487         if (rc < 0 && rc != -EDQUOT) {
1488                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1489                 RETURN(rc);
1490         }
1491
1492         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1493         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1494         if (body == NULL) {
1495                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1496                 RETURN(-EPROTO);
1497         }
1498
1499         /* set/clear over quota flag for a uid/gid */
1500         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1501             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1502                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1503
1504                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1505                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1506                        body->oa.o_flags);
1507                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1508         }
1509
1510         osc_update_grant(cli, body);
1511
1512         if (rc < 0)
1513                 RETURN(rc);
1514
1515         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1516                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1517
1518         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1519                 if (rc > 0) {
1520                         CERROR("Unexpected +ve rc %d\n", rc);
1521                         RETURN(-EPROTO);
1522                 }
1523                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1524
1525                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1526                         RETURN(-EAGAIN);
1527
1528                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1529                     check_write_checksum(&body->oa, peer, client_cksum,
1530                                          body->oa.o_cksum, aa->aa_requested_nob,
1531                                          aa->aa_page_count, aa->aa_ppga,
1532                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1533                         RETURN(-EAGAIN);
1534
1535                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1536                                      aa->aa_page_count, aa->aa_ppga);
1537                 GOTO(out, rc);
1538         }
1539
1540         /* The rest of this function executes only for OST_READs */
1541
1542         /* if unwrap_bulk failed, return -EAGAIN to retry */
1543         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1544         if (rc < 0)
1545                 GOTO(out, rc = -EAGAIN);
1546
1547         if (rc > aa->aa_requested_nob) {
1548                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1549                        aa->aa_requested_nob);
1550                 RETURN(-EPROTO);
1551         }
1552
1553         if (rc != req->rq_bulk->bd_nob_transferred) {
1554                 CERROR ("Unexpected rc %d (%d transferred)\n",
1555                         rc, req->rq_bulk->bd_nob_transferred);
1556                 return (-EPROTO);
1557         }
1558
1559         if (rc < aa->aa_requested_nob)
1560                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1561
1562         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1563                 static int cksum_counter;
1564                 __u32      server_cksum = body->oa.o_cksum;
1565                 char      *via;
1566                 char      *router;
1567                 cksum_type_t cksum_type;
1568
1569                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1570                                                body->oa.o_flags : 0);
1571                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1572                                                  aa->aa_ppga, OST_READ,
1573                                                  cksum_type);
1574
1575                 if (peer->nid == req->rq_bulk->bd_sender) {
1576                         via = router = "";
1577                 } else {
1578                         via = " via ";
1579                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1580                 }
1581
1582                 if (server_cksum == ~0 && rc > 0) {
1583                         CERROR("Protocol error: server %s set the 'checksum' "
1584                                "bit, but didn't send a checksum.  Not fatal, "
1585                                "but please notify on http://bugs.whamcloud.com/\n",
1586                                libcfs_nid2str(peer->nid));
1587                 } else if (server_cksum != client_cksum) {
1588                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1589                                            "%s%s%s inode "DFID" object "
1590                                            LPU64"/"LPU64" extent "
1591                                            "["LPU64"-"LPU64"]\n",
1592                                            req->rq_import->imp_obd->obd_name,
1593                                            libcfs_nid2str(peer->nid),
1594                                            via, router,
1595                                            body->oa.o_valid & OBD_MD_FLFID ?
1596                                                 body->oa.o_parent_seq : (__u64)0,
1597                                            body->oa.o_valid & OBD_MD_FLFID ?
1598                                                 body->oa.o_parent_oid : 0,
1599                                            body->oa.o_valid & OBD_MD_FLFID ?
1600                                                 body->oa.o_parent_ver : 0,
1601                                            body->oa.o_id,
1602                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1603                                                 body->oa.o_seq : (__u64)0,
1604                                            aa->aa_ppga[0]->off,
1605                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1606                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1607                                                                         1);
1608                         CERROR("client %x, server %x, cksum_type %x\n",
1609                                client_cksum, server_cksum, cksum_type);
1610                         cksum_counter = 0;
1611                         aa->aa_oa->o_cksum = client_cksum;
1612                         rc = -EAGAIN;
1613                 } else {
1614                         cksum_counter++;
1615                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1616                         rc = 0;
1617                 }
1618         } else if (unlikely(client_cksum)) {
1619                 static int cksum_missed;
1620
1621                 cksum_missed++;
1622                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1623                         CERROR("Checksum %u requested from %s but not sent\n",
1624                                cksum_missed, libcfs_nid2str(peer->nid));
1625         } else {
1626                 rc = 0;
1627         }
1628 out:
1629         if (rc >= 0)
1630                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1631
1632         RETURN(rc);
1633 }
1634
1635 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1636                             struct lov_stripe_md *lsm,
1637                             obd_count page_count, struct brw_page **pga,
1638                             struct obd_capa *ocapa)
1639 {
1640         struct ptlrpc_request *req;
1641         int                    rc;
1642         cfs_waitq_t            waitq;
1643         int                    generation, resends = 0;
1644         struct l_wait_info     lwi;
1645
1646         ENTRY;
1647
1648         cfs_waitq_init(&waitq);
1649         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1650
1651 restart_bulk:
1652         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1653                                   page_count, pga, &req, ocapa, 0, resends);
1654         if (rc != 0)
1655                 return (rc);
1656
1657         if (resends) {
1658                 req->rq_generation_set = 1;
1659                 req->rq_import_generation = generation;
1660                 req->rq_sent = cfs_time_current_sec() + resends;
1661         }
1662
1663         rc = ptlrpc_queue_wait(req);
1664
1665         if (rc == -ETIMEDOUT && req->rq_resend) {
1666                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1667                 ptlrpc_req_finished(req);
1668                 goto restart_bulk;
1669         }
1670
1671         rc = osc_brw_fini_request(req, rc);
1672
1673         ptlrpc_req_finished(req);
1674         /* When server return -EINPROGRESS, client should always retry
1675          * regardless of the number of times the bulk was resent already.*/
1676         if (osc_recoverable_error(rc)) {
1677                 resends++;
1678                 if (rc != -EINPROGRESS &&
1679                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1680                         CERROR("%s: too many resend retries for object: "
1681                                ""LPU64":"LPU64", rc = %d.\n",
1682                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1683                         goto out;
1684                 }
1685                 if (generation !=
1686                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1687                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1688                                ""LPU64":"LPU64", rc = %d.\n",
1689                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1690                         goto out;
1691                 }
1692
1693                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1694                                        NULL);
1695                 l_wait_event(waitq, 0, &lwi);
1696
1697                 goto restart_bulk;
1698         }
1699 out:
1700         if (rc == -EAGAIN || rc == -EINPROGRESS)
1701                 rc = -EIO;
1702         RETURN (rc);
1703 }
1704
1705 static int osc_brw_redo_request(struct ptlrpc_request *request,
1706                                 struct osc_brw_async_args *aa, int rc)
1707 {
1708         struct ptlrpc_request *new_req;
1709         struct osc_brw_async_args *new_aa;
1710         struct osc_async_page *oap;
1711         ENTRY;
1712
1713         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1714                   "redo for recoverable error %d", rc);
1715
1716         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1717                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1718                                   aa->aa_cli, aa->aa_oa,
1719                                   NULL /* lsm unused by osc currently */,
1720                                   aa->aa_page_count, aa->aa_ppga,
1721                                   &new_req, aa->aa_ocapa, 0, 1);
1722         if (rc)
1723                 RETURN(rc);
1724
1725         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1726                 if (oap->oap_request != NULL) {
1727                         LASSERTF(request == oap->oap_request,
1728                                  "request %p != oap_request %p\n",
1729                                  request, oap->oap_request);
1730                         if (oap->oap_interrupted) {
1731                                 ptlrpc_req_finished(new_req);
1732                                 RETURN(-EINTR);
1733                         }
1734                 }
1735         }
1736         /* New request takes over pga and oaps from old request.
1737          * Note that copying a list_head doesn't work, need to move it... */
1738         aa->aa_resends++;
1739         new_req->rq_interpret_reply = request->rq_interpret_reply;
1740         new_req->rq_async_args = request->rq_async_args;
1741         /* cap resend delay to the current request timeout, this is similar to
1742          * what ptlrpc does (see after_reply()) */
1743         if (aa->aa_resends > new_req->rq_timeout)
1744                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1745         else
1746                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1747         new_req->rq_generation_set = 1;
1748         new_req->rq_import_generation = request->rq_import_generation;
1749
1750         new_aa = ptlrpc_req_async_args(new_req);
1751
1752         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1753         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1754         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1755         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1756
1757         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1758                 if (oap->oap_request) {
1759                         ptlrpc_req_finished(oap->oap_request);
1760                         oap->oap_request = ptlrpc_request_addref(new_req);
1761                 }
1762         }
1763
1764         new_aa->aa_ocapa = aa->aa_ocapa;
1765         aa->aa_ocapa = NULL;
1766
1767         /* XXX: This code will run into problem if we're going to support
1768          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1769          * and wait for all of them to be finished. We should inherit request
1770          * set from old request. */
1771         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1772
1773         DEBUG_REQ(D_INFO, new_req, "new request");
1774         RETURN(0);
1775 }
1776
1777 /*
1778  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1779  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1780  * fine for our small page arrays and doesn't require allocation.  its an
1781  * insertion sort that swaps elements that are strides apart, shrinking the
1782  * stride down until its '1' and the array is sorted.
1783  */
1784 static void sort_brw_pages(struct brw_page **array, int num)
1785 {
1786         int stride, i, j;
1787         struct brw_page *tmp;
1788
1789         if (num == 1)
1790                 return;
1791         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1792                 ;
1793
1794         do {
1795                 stride /= 3;
1796                 for (i = stride ; i < num ; i++) {
1797                         tmp = array[i];
1798                         j = i;
1799                         while (j >= stride && array[j - stride]->off > tmp->off) {
1800                                 array[j] = array[j - stride];
1801                                 j -= stride;
1802                         }
1803                         array[j] = tmp;
1804                 }
1805         } while (stride > 1);
1806 }
1807
1808 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1809 {
1810         int count = 1;
1811         int offset;
1812         int i = 0;
1813
1814         LASSERT (pages > 0);
1815         offset = pg[i]->off & ~CFS_PAGE_MASK;
1816
1817         for (;;) {
1818                 pages--;
1819                 if (pages == 0)         /* that's all */
1820                         return count;
1821
1822                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1823                         return count;   /* doesn't end on page boundary */
1824
1825                 i++;
1826                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1827                 if (offset != 0)        /* doesn't start on page boundary */
1828                         return count;
1829
1830                 count++;
1831         }
1832 }
1833
1834 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1835 {
1836         struct brw_page **ppga;
1837         int i;
1838
1839         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1840         if (ppga == NULL)
1841                 return NULL;
1842
1843         for (i = 0; i < count; i++)
1844                 ppga[i] = pga + i;
1845         return ppga;
1846 }
1847
1848 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1849 {
1850         LASSERT(ppga != NULL);
1851         OBD_FREE(ppga, sizeof(*ppga) * count);
1852 }
1853
1854 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1855                    obd_count page_count, struct brw_page *pga,
1856                    struct obd_trans_info *oti)
1857 {
1858         struct obdo *saved_oa = NULL;
1859         struct brw_page **ppga, **orig;
1860         struct obd_import *imp = class_exp2cliimp(exp);
1861         struct client_obd *cli;
1862         int rc, page_count_orig;
1863         ENTRY;
1864
1865         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1866         cli = &imp->imp_obd->u.cli;
1867
1868         if (cmd & OBD_BRW_CHECK) {
1869                 /* The caller just wants to know if there's a chance that this
1870                  * I/O can succeed */
1871
1872                 if (imp->imp_invalid)
1873                         RETURN(-EIO);
1874                 RETURN(0);
1875         }
1876
1877         /* test_brw with a failed create can trip this, maybe others. */
1878         LASSERT(cli->cl_max_pages_per_rpc);
1879
1880         rc = 0;
1881
1882         orig = ppga = osc_build_ppga(pga, page_count);
1883         if (ppga == NULL)
1884                 RETURN(-ENOMEM);
1885         page_count_orig = page_count;
1886
1887         sort_brw_pages(ppga, page_count);
1888         while (page_count) {
1889                 obd_count pages_per_brw;
1890
1891                 if (page_count > cli->cl_max_pages_per_rpc)
1892                         pages_per_brw = cli->cl_max_pages_per_rpc;
1893                 else
1894                         pages_per_brw = page_count;
1895
1896                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1897
1898                 if (saved_oa != NULL) {
1899                         /* restore previously saved oa */
1900                         *oinfo->oi_oa = *saved_oa;
1901                 } else if (page_count > pages_per_brw) {
1902                         /* save a copy of oa (brw will clobber it) */
1903                         OBDO_ALLOC(saved_oa);
1904                         if (saved_oa == NULL)
1905                                 GOTO(out, rc = -ENOMEM);
1906                         *saved_oa = *oinfo->oi_oa;
1907                 }
1908
1909                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1910                                       pages_per_brw, ppga, oinfo->oi_capa);
1911
1912                 if (rc != 0)
1913                         break;
1914
1915                 page_count -= pages_per_brw;
1916                 ppga += pages_per_brw;
1917         }
1918
1919 out:
1920         osc_release_ppga(orig, page_count_orig);
1921
1922         if (saved_oa != NULL)
1923                 OBDO_FREE(saved_oa);
1924
1925         RETURN(rc);
1926 }
1927
1928 static int brw_interpret(const struct lu_env *env,
1929                          struct ptlrpc_request *req, void *data, int rc)
1930 {
1931         struct osc_brw_async_args *aa = data;
1932         struct osc_extent *ext;
1933         struct osc_extent *tmp;
1934         struct cl_object  *obj = NULL;
1935         struct client_obd *cli = aa->aa_cli;
1936         ENTRY;
1937
1938         rc = osc_brw_fini_request(req, rc);
1939         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1940         /* When server return -EINPROGRESS, client should always retry
1941          * regardless of the number of times the bulk was resent already. */
1942         if (osc_recoverable_error(rc)) {
1943                 if (req->rq_import_generation !=
1944                     req->rq_import->imp_generation) {
1945                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1946                                ""LPU64":"LPU64", rc = %d.\n",
1947                                req->rq_import->imp_obd->obd_name,
1948                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1949                 } else if (rc == -EINPROGRESS ||
1950                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1951                         rc = osc_brw_redo_request(req, aa, rc);
1952                 } else {
1953                         CERROR("%s: too many resent retries for object: "
1954                                ""LPU64":"LPU64", rc = %d.\n",
1955                                req->rq_import->imp_obd->obd_name,
1956                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1957                 }
1958
1959                 if (rc == 0)
1960                         RETURN(0);
1961                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1962                         rc = -EIO;
1963         }
1964
1965         if (aa->aa_ocapa) {
1966                 capa_put(aa->aa_ocapa);
1967                 aa->aa_ocapa = NULL;
1968         }
1969
1970         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1971                 if (obj == NULL && rc == 0) {
1972                         obj = osc2cl(ext->oe_obj);
1973                         cl_object_get(obj);
1974                 }
1975
1976                 cfs_list_del_init(&ext->oe_link);
1977                 osc_extent_finish(env, ext, 1, rc);
1978         }
1979         LASSERT(cfs_list_empty(&aa->aa_exts));
1980         LASSERT(cfs_list_empty(&aa->aa_oaps));
1981
1982         if (obj != NULL) {
1983                 struct obdo *oa = aa->aa_oa;
1984                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1985                 unsigned long valid = 0;
1986
1987                 LASSERT(rc == 0);
1988                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1989                         attr->cat_blocks = oa->o_blocks;
1990                         valid |= CAT_BLOCKS;
1991                 }
1992                 if (oa->o_valid & OBD_MD_FLMTIME) {
1993                         attr->cat_mtime = oa->o_mtime;
1994                         valid |= CAT_MTIME;
1995                 }
1996                 if (oa->o_valid & OBD_MD_FLATIME) {
1997                         attr->cat_atime = oa->o_atime;
1998                         valid |= CAT_ATIME;
1999                 }
2000                 if (oa->o_valid & OBD_MD_FLCTIME) {
2001                         attr->cat_ctime = oa->o_ctime;
2002                         valid |= CAT_CTIME;
2003                 }
2004                 if (valid != 0) {
2005                         cl_object_attr_lock(obj);
2006                         cl_object_attr_set(env, obj, attr, valid);
2007                         cl_object_attr_unlock(obj);
2008                 }
2009                 cl_object_put(env, obj);
2010         }
2011         OBDO_FREE(aa->aa_oa);
2012
2013         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2014                           req->rq_bulk->bd_nob_transferred);
2015         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2016         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2017
2018         client_obd_list_lock(&cli->cl_loi_list_lock);
2019         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2020          * is called so we know whether to go to sync BRWs or wait for more
2021          * RPCs to complete */
2022         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2023                 cli->cl_w_in_flight--;
2024         else
2025                 cli->cl_r_in_flight--;
2026         osc_wake_cache_waiters(cli);
2027         client_obd_list_unlock(&cli->cl_loi_list_lock);
2028
2029         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2030         RETURN(rc);
2031 }
2032
2033 /**
2034  * Build an RPC by the list of extent @ext_list. The caller must ensure
2035  * that the total pages in this list are NOT over max pages per RPC.
2036  * Extents in the list must be in OES_RPC state.
2037  */
2038 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2039                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2040 {
2041         struct ptlrpc_request *req = NULL;
2042         struct osc_extent *ext;
2043         CFS_LIST_HEAD(rpc_list);
2044         struct brw_page **pga = NULL;
2045         struct osc_brw_async_args *aa = NULL;
2046         struct obdo *oa = NULL;
2047         struct osc_async_page *oap;
2048         struct osc_async_page *tmp;
2049         struct cl_req *clerq = NULL;
2050         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2051         struct ldlm_lock *lock = NULL;
2052         struct cl_req_attr crattr;
2053         obd_off starting_offset = OBD_OBJECT_EOF;
2054         obd_off ending_offset = 0;
2055         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2056
2057         ENTRY;
2058         LASSERT(!cfs_list_empty(ext_list));
2059
2060         /* add pages into rpc_list to build BRW rpc */
2061         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2062                 LASSERT(ext->oe_state == OES_RPC);
2063                 mem_tight |= ext->oe_memalloc;
2064                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2065                         ++page_count;
2066                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2067                         if (starting_offset > oap->oap_obj_off)
2068                                 starting_offset = oap->oap_obj_off;
2069                         else
2070                                 LASSERT(oap->oap_page_off == 0);
2071                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2072                                 ending_offset = oap->oap_obj_off +
2073                                                 oap->oap_count;
2074                         else
2075                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2076                                         CFS_PAGE_SIZE);
2077                 }
2078         }
2079
2080         if (mem_tight)
2081                 mpflag = cfs_memory_pressure_get_and_set();
2082
2083         memset(&crattr, 0, sizeof crattr);
2084         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2085         if (pga == NULL)
2086                 GOTO(out, rc = -ENOMEM);
2087
2088         OBDO_ALLOC(oa);
2089         if (oa == NULL)
2090                 GOTO(out, rc = -ENOMEM);
2091
2092         i = 0;
2093         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2094                 struct cl_page *page = oap2cl_page(oap);
2095                 if (clerq == NULL) {
2096                         clerq = cl_req_alloc(env, page, crt,
2097                                              1 /* only 1-object rpcs for
2098                                                 * now */);
2099                         if (IS_ERR(clerq))
2100                                 GOTO(out, rc = PTR_ERR(clerq));
2101                         lock = oap->oap_ldlm_lock;
2102                 }
2103                 if (mem_tight)
2104                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2105                 pga[i] = &oap->oap_brw_page;
2106                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2107                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2108                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2109                 i++;
2110                 cl_req_page_add(env, clerq, page);
2111         }
2112
2113         /* always get the data for the obdo for the rpc */
2114         LASSERT(clerq != NULL);
2115         crattr.cra_oa = oa;
2116         crattr.cra_capa = NULL;
2117         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2118         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2119         if (lock) {
2120                 oa->o_handle = lock->l_remote_handle;
2121                 oa->o_valid |= OBD_MD_FLHANDLE;
2122         }
2123
2124         rc = cl_req_prep(env, clerq);
2125         if (rc != 0) {
2126                 CERROR("cl_req_prep failed: %d\n", rc);
2127                 GOTO(out, rc);
2128         }
2129
2130         sort_brw_pages(pga, page_count);
2131         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2132                         pga, &req, crattr.cra_capa, 1, 0);
2133         if (rc != 0) {
2134                 CERROR("prep_req failed: %d\n", rc);
2135                 GOTO(out, rc);
2136         }
2137
2138         req->rq_interpret_reply = brw_interpret;
2139         if (mem_tight != 0)
2140                 req->rq_memalloc = 1;
2141
2142         /* Need to update the timestamps after the request is built in case
2143          * we race with setattr (locally or in queue at OST).  If OST gets
2144          * later setattr before earlier BRW (as determined by the request xid),
2145          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2146          * way to do this in a single call.  bug 10150 */
2147         cl_req_attr_set(env, clerq, &crattr,
2148                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2149
2150         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2151
2152         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2153         aa = ptlrpc_req_async_args(req);
2154         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2155         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2156         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2157         cfs_list_splice_init(ext_list, &aa->aa_exts);
2158         aa->aa_clerq = clerq;
2159
2160         /* queued sync pages can be torn down while the pages
2161          * were between the pending list and the rpc */
2162         tmp = NULL;
2163         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2164                 /* only one oap gets a request reference */
2165                 if (tmp == NULL)
2166                         tmp = oap;
2167                 if (oap->oap_interrupted && !req->rq_intr) {
2168                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2169                                         oap, req);
2170                         ptlrpc_mark_interrupted(req);
2171                 }
2172         }
2173         if (tmp != NULL)
2174                 tmp->oap_request = ptlrpc_request_addref(req);
2175
2176         client_obd_list_lock(&cli->cl_loi_list_lock);
2177         starting_offset >>= CFS_PAGE_SHIFT;
2178         if (cmd == OBD_BRW_READ) {
2179                 cli->cl_r_in_flight++;
2180                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2181                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2182                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2183                                       starting_offset + 1);
2184         } else {
2185                 cli->cl_w_in_flight++;
2186                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2187                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2188                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2189                                       starting_offset + 1);
2190         }
2191         client_obd_list_unlock(&cli->cl_loi_list_lock);
2192
2193         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2194                   page_count, aa, cli->cl_r_in_flight,
2195                   cli->cl_w_in_flight);
2196
2197         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2198          * see which CPU/NUMA node the majority of pages were allocated
2199          * on, and try to assign the async RPC to the CPU core
2200          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2201          *
2202          * But on the other hand, we expect that multiple ptlrpcd
2203          * threads and the initial write sponsor can run in parallel,
2204          * especially when data checksum is enabled, which is CPU-bound
2205          * operation and single ptlrpcd thread cannot process in time.
2206          * So more ptlrpcd threads sharing BRW load
2207          * (with PDL_POLICY_ROUND) seems better.
2208          */
2209         ptlrpcd_add_req(req, pol, -1);
2210         rc = 0;
2211         EXIT;
2212
2213 out:
2214         if (mem_tight != 0)
2215                 cfs_memory_pressure_restore(mpflag);
2216
2217         capa_put(crattr.cra_capa);
2218         if (rc != 0) {
2219                 LASSERT(req == NULL);
2220
2221                 if (oa)
2222                         OBDO_FREE(oa);
2223                 if (pga)
2224                         OBD_FREE(pga, sizeof(*pga) * page_count);
2225                 /* this should happen rarely and is pretty bad, it makes the
2226                  * pending list not follow the dirty order */
2227                 while (!cfs_list_empty(ext_list)) {
2228                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2229                                              oe_link);
2230                         cfs_list_del_init(&ext->oe_link);
2231                         osc_extent_finish(env, ext, 0, rc);
2232                 }
2233                 if (clerq && !IS_ERR(clerq))
2234                         cl_req_completion(env, clerq, rc);
2235         }
2236         RETURN(rc);
2237 }
2238
2239 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2240                                         struct ldlm_enqueue_info *einfo)
2241 {
2242         void *data = einfo->ei_cbdata;
2243         int set = 0;
2244
2245         LASSERT(lock != NULL);
2246         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2247         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2248         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2249         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2250
2251         lock_res_and_lock(lock);
2252         cfs_spin_lock(&osc_ast_guard);
2253
2254         if (lock->l_ast_data == NULL)
2255                 lock->l_ast_data = data;
2256         if (lock->l_ast_data == data)
2257                 set = 1;
2258
2259         cfs_spin_unlock(&osc_ast_guard);
2260         unlock_res_and_lock(lock);
2261
2262         return set;
2263 }
2264
2265 static int osc_set_data_with_check(struct lustre_handle *lockh,
2266                                    struct ldlm_enqueue_info *einfo)
2267 {
2268         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2269         int set = 0;
2270
2271         if (lock != NULL) {
2272                 set = osc_set_lock_data_with_check(lock, einfo);
2273                 LDLM_LOCK_PUT(lock);
2274         } else
2275                 CERROR("lockh %p, data %p - client evicted?\n",
2276                        lockh, einfo->ei_cbdata);
2277         return set;
2278 }
2279
2280 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2281                              ldlm_iterator_t replace, void *data)
2282 {
2283         struct ldlm_res_id res_id;
2284         struct obd_device *obd = class_exp2obd(exp);
2285
2286         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2287         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2288         return 0;
2289 }
2290
2291 /* find any ldlm lock of the inode in osc
2292  * return 0    not find
2293  *        1    find one
2294  *      < 0    error */
2295 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2296                            ldlm_iterator_t replace, void *data)
2297 {
2298         struct ldlm_res_id res_id;
2299         struct obd_device *obd = class_exp2obd(exp);
2300         int rc = 0;
2301
2302         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2303         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2304         if (rc == LDLM_ITER_STOP)
2305                 return(1);
2306         if (rc == LDLM_ITER_CONTINUE)
2307                 return(0);
2308         return(rc);
2309 }
2310
2311 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2312                             obd_enqueue_update_f upcall, void *cookie,
2313                             __u64 *flags, int agl, int rc)
2314 {
2315         int intent = *flags & LDLM_FL_HAS_INTENT;
2316         ENTRY;
2317
2318         if (intent) {
2319                 /* The request was created before ldlm_cli_enqueue call. */
2320                 if (rc == ELDLM_LOCK_ABORTED) {
2321                         struct ldlm_reply *rep;
2322                         rep = req_capsule_server_get(&req->rq_pill,
2323                                                      &RMF_DLM_REP);
2324
2325                         LASSERT(rep != NULL);
2326                         if (rep->lock_policy_res1)
2327                                 rc = rep->lock_policy_res1;
2328                 }
2329         }
2330
2331         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2332             (rc == 0)) {
2333                 *flags |= LDLM_FL_LVB_READY;
2334                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2335                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2336         }
2337
2338         /* Call the update callback. */
2339         rc = (*upcall)(cookie, rc);
2340         RETURN(rc);
2341 }
2342
2343 static int osc_enqueue_interpret(const struct lu_env *env,
2344                                  struct ptlrpc_request *req,
2345                                  struct osc_enqueue_args *aa, int rc)
2346 {
2347         struct ldlm_lock *lock;
2348         struct lustre_handle handle;
2349         __u32 mode;
2350         struct ost_lvb *lvb;
2351         __u32 lvb_len;
2352         __u64 *flags = aa->oa_flags;
2353
2354         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2355          * might be freed anytime after lock upcall has been called. */
2356         lustre_handle_copy(&handle, aa->oa_lockh);
2357         mode = aa->oa_ei->ei_mode;
2358
2359         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2360          * be valid. */
2361         lock = ldlm_handle2lock(&handle);
2362
2363         /* Take an additional reference so that a blocking AST that
2364          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2365          * to arrive after an upcall has been executed by
2366          * osc_enqueue_fini(). */
2367         ldlm_lock_addref(&handle, mode);
2368
2369         /* Let CP AST to grant the lock first. */
2370         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2371
2372         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2373                 lvb = NULL;
2374                 lvb_len = 0;
2375         } else {
2376                 lvb = aa->oa_lvb;
2377                 lvb_len = sizeof(*aa->oa_lvb);
2378         }
2379
2380         /* Complete obtaining the lock procedure. */
2381         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2382                                    mode, flags, lvb, lvb_len, &handle, rc);
2383         /* Complete osc stuff. */
2384         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2385                               flags, aa->oa_agl, rc);
2386
2387         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2388
2389         /* Release the lock for async request. */
2390         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2391                 /*
2392                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2393                  * not already released by
2394                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2395                  */
2396                 ldlm_lock_decref(&handle, mode);
2397
2398         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2399                  aa->oa_lockh, req, aa);
2400         ldlm_lock_decref(&handle, mode);
2401         LDLM_LOCK_PUT(lock);
2402         return rc;
2403 }
2404
2405 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2406                         struct lov_oinfo *loi, int flags,
2407                         struct ost_lvb *lvb, __u32 mode, int rc)
2408 {
2409         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2410
2411         if (rc == ELDLM_OK) {
2412                 __u64 tmp;
2413
2414                 LASSERT(lock != NULL);
2415                 loi->loi_lvb = *lvb;
2416                 tmp = loi->loi_lvb.lvb_size;
2417                 /* Extend KMS up to the end of this lock and no further
2418                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2419                 if (tmp > lock->l_policy_data.l_extent.end)
2420                         tmp = lock->l_policy_data.l_extent.end + 1;
2421                 if (tmp >= loi->loi_kms) {
2422                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2423                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2424                         loi_kms_set(loi, tmp);
2425                 } else {
2426                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2427                                    LPU64"; leaving kms="LPU64", end="LPU64,
2428                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2429                                    lock->l_policy_data.l_extent.end);
2430                 }
2431                 ldlm_lock_allow_match(lock);
2432         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2433                 LASSERT(lock != NULL);
2434                 loi->loi_lvb = *lvb;
2435                 ldlm_lock_allow_match(lock);
2436                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2437                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2438                 rc = ELDLM_OK;
2439         }
2440
2441         if (lock != NULL) {
2442                 if (rc != ELDLM_OK)
2443                         ldlm_lock_fail_match(lock);
2444
2445                 LDLM_LOCK_PUT(lock);
2446         }
2447 }
2448 EXPORT_SYMBOL(osc_update_enqueue);
2449
2450 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2451
2452 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2453  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2454  * other synchronous requests, however keeping some locks and trying to obtain
2455  * others may take a considerable amount of time in a case of ost failure; and
2456  * when other sync requests do not get released lock from a client, the client
2457  * is excluded from the cluster -- such scenarious make the life difficult, so
2458  * release locks just after they are obtained. */
2459 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2460                      __u64 *flags, ldlm_policy_data_t *policy,
2461                      struct ost_lvb *lvb, int kms_valid,
2462                      obd_enqueue_update_f upcall, void *cookie,
2463                      struct ldlm_enqueue_info *einfo,
2464                      struct lustre_handle *lockh,
2465                      struct ptlrpc_request_set *rqset, int async, int agl)
2466 {
2467         struct obd_device *obd = exp->exp_obd;
2468         struct ptlrpc_request *req = NULL;
2469         int intent = *flags & LDLM_FL_HAS_INTENT;
2470         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2471         ldlm_mode_t mode;
2472         int rc;
2473         ENTRY;
2474
2475         /* Filesystem lock extents are extended to page boundaries so that
2476          * dealing with the page cache is a little smoother.  */
2477         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2478         policy->l_extent.end |= ~CFS_PAGE_MASK;
2479
2480         /*
2481          * kms is not valid when either object is completely fresh (so that no
2482          * locks are cached), or object was evicted. In the latter case cached
2483          * lock cannot be used, because it would prime inode state with
2484          * potentially stale LVB.
2485          */
2486         if (!kms_valid)
2487                 goto no_match;
2488
2489         /* Next, search for already existing extent locks that will cover us */
2490         /* If we're trying to read, we also search for an existing PW lock.  The
2491          * VFS and page cache already protect us locally, so lots of readers/
2492          * writers can share a single PW lock.
2493          *
2494          * There are problems with conversion deadlocks, so instead of
2495          * converting a read lock to a write lock, we'll just enqueue a new
2496          * one.
2497          *
2498          * At some point we should cancel the read lock instead of making them
2499          * send us a blocking callback, but there are problems with canceling
2500          * locks out from other users right now, too. */
2501         mode = einfo->ei_mode;
2502         if (einfo->ei_mode == LCK_PR)
2503                 mode |= LCK_PW;
2504         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2505                                einfo->ei_type, policy, mode, lockh, 0);
2506         if (mode) {
2507                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2508
2509                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2510                         /* For AGL, if enqueue RPC is sent but the lock is not
2511                          * granted, then skip to process this strpe.
2512                          * Return -ECANCELED to tell the caller. */
2513                         ldlm_lock_decref(lockh, mode);
2514                         LDLM_LOCK_PUT(matched);
2515                         RETURN(-ECANCELED);
2516                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2517                         *flags |= LDLM_FL_LVB_READY;
2518                         /* addref the lock only if not async requests and PW
2519                          * lock is matched whereas we asked for PR. */
2520                         if (!rqset && einfo->ei_mode != mode)
2521                                 ldlm_lock_addref(lockh, LCK_PR);
2522                         if (intent) {
2523                                 /* I would like to be able to ASSERT here that
2524                                  * rss <= kms, but I can't, for reasons which
2525                                  * are explained in lov_enqueue() */
2526                         }
2527
2528                         /* We already have a lock, and it's referenced.
2529                          *
2530                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2531                          * AGL upcall may change it to CLS_HELD directly. */
2532                         (*upcall)(cookie, ELDLM_OK);
2533
2534                         if (einfo->ei_mode != mode)
2535                                 ldlm_lock_decref(lockh, LCK_PW);
2536                         else if (rqset)
2537                                 /* For async requests, decref the lock. */
2538                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2539                         LDLM_LOCK_PUT(matched);
2540                         RETURN(ELDLM_OK);
2541                 } else {
2542                         ldlm_lock_decref(lockh, mode);
2543                         LDLM_LOCK_PUT(matched);
2544                 }
2545         }
2546
2547  no_match:
2548         if (intent) {
2549                 CFS_LIST_HEAD(cancels);
2550                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2551                                            &RQF_LDLM_ENQUEUE_LVB);
2552                 if (req == NULL)
2553                         RETURN(-ENOMEM);
2554
2555                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2556                 if (rc) {
2557                         ptlrpc_request_free(req);
2558                         RETURN(rc);
2559                 }
2560
2561                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2562                                      sizeof *lvb);
2563                 ptlrpc_request_set_replen(req);
2564         }
2565
2566         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2567         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2568
2569         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2570                               sizeof(*lvb), lockh, async);
2571         if (rqset) {
2572                 if (!rc) {
2573                         struct osc_enqueue_args *aa;
2574                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2575                         aa = ptlrpc_req_async_args(req);
2576                         aa->oa_ei = einfo;
2577                         aa->oa_exp = exp;
2578                         aa->oa_flags  = flags;
2579                         aa->oa_upcall = upcall;
2580                         aa->oa_cookie = cookie;
2581                         aa->oa_lvb    = lvb;
2582                         aa->oa_lockh  = lockh;
2583                         aa->oa_agl    = !!agl;
2584
2585                         req->rq_interpret_reply =
2586                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2587                         if (rqset == PTLRPCD_SET)
2588                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2589                         else
2590                                 ptlrpc_set_add_req(rqset, req);
2591                 } else if (intent) {
2592                         ptlrpc_req_finished(req);
2593                 }
2594                 RETURN(rc);
2595         }
2596
2597         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2598         if (intent)
2599                 ptlrpc_req_finished(req);
2600
2601         RETURN(rc);
2602 }
2603
2604 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2605                        struct ldlm_enqueue_info *einfo,
2606                        struct ptlrpc_request_set *rqset)
2607 {
2608         struct ldlm_res_id res_id;
2609         int rc;
2610         ENTRY;
2611
2612         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2613                            oinfo->oi_md->lsm_object_seq, &res_id);
2614
2615         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2616                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2617                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2618                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2619                               rqset, rqset != NULL, 0);
2620         RETURN(rc);
2621 }
2622
2623 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2624                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2625                    int *flags, void *data, struct lustre_handle *lockh,
2626                    int unref)
2627 {
2628         struct obd_device *obd = exp->exp_obd;
2629         int lflags = *flags;
2630         ldlm_mode_t rc;
2631         ENTRY;
2632
2633         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2634                 RETURN(-EIO);
2635
2636         /* Filesystem lock extents are extended to page boundaries so that
2637          * dealing with the page cache is a little smoother */
2638         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2639         policy->l_extent.end |= ~CFS_PAGE_MASK;
2640
2641         /* Next, search for already existing extent locks that will cover us */
2642         /* If we're trying to read, we also search for an existing PW lock.  The
2643          * VFS and page cache already protect us locally, so lots of readers/
2644          * writers can share a single PW lock. */
2645         rc = mode;
2646         if (mode == LCK_PR)
2647                 rc |= LCK_PW;
2648         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2649                              res_id, type, policy, rc, lockh, unref);
2650         if (rc) {
2651                 if (data != NULL) {
2652                         if (!osc_set_data_with_check(lockh, data)) {
2653                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2654                                         ldlm_lock_decref(lockh, rc);
2655                                 RETURN(0);
2656                         }
2657                 }
2658                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2659                         ldlm_lock_addref(lockh, LCK_PR);
2660                         ldlm_lock_decref(lockh, LCK_PW);
2661                 }
2662                 RETURN(rc);
2663         }
2664         RETURN(rc);
2665 }
2666
2667 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2668 {
2669         ENTRY;
2670
2671         if (unlikely(mode == LCK_GROUP))
2672                 ldlm_lock_decref_and_cancel(lockh, mode);
2673         else
2674                 ldlm_lock_decref(lockh, mode);
2675
2676         RETURN(0);
2677 }
2678
2679 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2680                       __u32 mode, struct lustre_handle *lockh)
2681 {
2682         ENTRY;
2683         RETURN(osc_cancel_base(lockh, mode));
2684 }
2685
2686 static int osc_cancel_unused(struct obd_export *exp,
2687                              struct lov_stripe_md *lsm,
2688                              ldlm_cancel_flags_t flags,
2689                              void *opaque)
2690 {
2691         struct obd_device *obd = class_exp2obd(exp);
2692         struct ldlm_res_id res_id, *resp = NULL;
2693
2694         if (lsm != NULL) {
2695                 resp = osc_build_res_name(lsm->lsm_object_id,
2696                                           lsm->lsm_object_seq, &res_id);
2697         }
2698
2699         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2700 }
2701
2702 static int osc_statfs_interpret(const struct lu_env *env,
2703                                 struct ptlrpc_request *req,
2704                                 struct osc_async_args *aa, int rc)
2705 {
2706         struct obd_statfs *msfs;
2707         ENTRY;
2708
2709         if (rc == -EBADR)
2710                 /* The request has in fact never been sent
2711                  * due to issues at a higher level (LOV).
2712                  * Exit immediately since the caller is
2713                  * aware of the problem and takes care
2714                  * of the clean up */
2715                  RETURN(rc);
2716
2717         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2718             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2719                 GOTO(out, rc = 0);
2720
2721         if (rc != 0)
2722                 GOTO(out, rc);
2723
2724         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2725         if (msfs == NULL) {
2726                 GOTO(out, rc = -EPROTO);
2727         }
2728
2729         *aa->aa_oi->oi_osfs = *msfs;
2730 out:
2731         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2732         RETURN(rc);
2733 }
2734
2735 static int osc_statfs_async(struct obd_export *exp,
2736                             struct obd_info *oinfo, __u64 max_age,
2737                             struct ptlrpc_request_set *rqset)
2738 {
2739         struct obd_device     *obd = class_exp2obd(exp);
2740         struct ptlrpc_request *req;
2741         struct osc_async_args *aa;
2742         int                    rc;
2743         ENTRY;
2744
2745         /* We could possibly pass max_age in the request (as an absolute
2746          * timestamp or a "seconds.usec ago") so the target can avoid doing
2747          * extra calls into the filesystem if that isn't necessary (e.g.
2748          * during mount that would help a bit).  Having relative timestamps
2749          * is not so great if request processing is slow, while absolute
2750          * timestamps are not ideal because they need time synchronization. */
2751         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2752         if (req == NULL)
2753                 RETURN(-ENOMEM);
2754
2755         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2756         if (rc) {
2757                 ptlrpc_request_free(req);
2758                 RETURN(rc);
2759         }
2760         ptlrpc_request_set_replen(req);
2761         req->rq_request_portal = OST_CREATE_PORTAL;
2762         ptlrpc_at_set_req_timeout(req);
2763
2764         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2765                 /* procfs requests not want stat in wait for avoid deadlock */
2766                 req->rq_no_resend = 1;
2767                 req->rq_no_delay = 1;
2768         }
2769
2770         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2771         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2772         aa = ptlrpc_req_async_args(req);
2773         aa->aa_oi = oinfo;
2774
2775         ptlrpc_set_add_req(rqset, req);
2776         RETURN(0);
2777 }
2778
2779 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2780                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2781 {
2782         struct obd_device     *obd = class_exp2obd(exp);
2783         struct obd_statfs     *msfs;
2784         struct ptlrpc_request *req;
2785         struct obd_import     *imp = NULL;
2786         int rc;
2787         ENTRY;
2788
2789         /*Since the request might also come from lprocfs, so we need
2790          *sync this with client_disconnect_export Bug15684*/
2791         cfs_down_read(&obd->u.cli.cl_sem);
2792         if (obd->u.cli.cl_import)
2793                 imp = class_import_get(obd->u.cli.cl_import);
2794         cfs_up_read(&obd->u.cli.cl_sem);
2795         if (!imp)
2796                 RETURN(-ENODEV);
2797
2798         /* We could possibly pass max_age in the request (as an absolute
2799          * timestamp or a "seconds.usec ago") so the target can avoid doing
2800          * extra calls into the filesystem if that isn't necessary (e.g.
2801          * during mount that would help a bit).  Having relative timestamps
2802          * is not so great if request processing is slow, while absolute
2803          * timestamps are not ideal because they need time synchronization. */
2804         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2805
2806         class_import_put(imp);
2807
2808         if (req == NULL)
2809                 RETURN(-ENOMEM);
2810
2811         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2812         if (rc) {
2813                 ptlrpc_request_free(req);
2814                 RETURN(rc);
2815         }
2816         ptlrpc_request_set_replen(req);
2817         req->rq_request_portal = OST_CREATE_PORTAL;
2818         ptlrpc_at_set_req_timeout(req);
2819
2820         if (flags & OBD_STATFS_NODELAY) {
2821                 /* procfs requests not want stat in wait for avoid deadlock */
2822                 req->rq_no_resend = 1;
2823                 req->rq_no_delay = 1;
2824         }
2825
2826         rc = ptlrpc_queue_wait(req);
2827         if (rc)
2828                 GOTO(out, rc);
2829
2830         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2831         if (msfs == NULL) {
2832                 GOTO(out, rc = -EPROTO);
2833         }
2834
2835         *osfs = *msfs;
2836
2837         EXIT;
2838  out:
2839         ptlrpc_req_finished(req);
2840         return rc;
2841 }
2842
2843 /* Retrieve object striping information.
2844  *
2845  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2846  * the maximum number of OST indices which will fit in the user buffer.
2847  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2848  */
2849 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2850 {
2851         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2852         struct lov_user_md_v3 lum, *lumk;
2853         struct lov_user_ost_data_v1 *lmm_objects;
2854         int rc = 0, lum_size;
2855         ENTRY;
2856
2857         if (!lsm)
2858                 RETURN(-ENODATA);
2859
2860         /* we only need the header part from user space to get lmm_magic and
2861          * lmm_stripe_count, (the header part is common to v1 and v3) */
2862         lum_size = sizeof(struct lov_user_md_v1);
2863         if (cfs_copy_from_user(&lum, lump, lum_size))
2864                 RETURN(-EFAULT);
2865
2866         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2867             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2868                 RETURN(-EINVAL);
2869
2870         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2871         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2872         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2873         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2874
2875         /* we can use lov_mds_md_size() to compute lum_size
2876          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2877         if (lum.lmm_stripe_count > 0) {
2878                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2879                 OBD_ALLOC(lumk, lum_size);
2880                 if (!lumk)
2881                         RETURN(-ENOMEM);
2882
2883                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2884                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2885                 else
2886                         lmm_objects = &(lumk->lmm_objects[0]);
2887                 lmm_objects->l_object_id = lsm->lsm_object_id;
2888         } else {
2889                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2890                 lumk = &lum;
2891         }
2892
2893         lumk->lmm_object_id = lsm->lsm_object_id;
2894         lumk->lmm_object_seq = lsm->lsm_object_seq;
2895         lumk->lmm_stripe_count = 1;
2896
2897         if (cfs_copy_to_user(lump, lumk, lum_size))
2898                 rc = -EFAULT;
2899
2900         if (lumk != &lum)
2901                 OBD_FREE(lumk, lum_size);
2902
2903         RETURN(rc);
2904 }
2905
2906
2907 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2908                          void *karg, void *uarg)
2909 {
2910         struct obd_device *obd = exp->exp_obd;
2911         struct obd_ioctl_data *data = karg;
2912         int err = 0;
2913         ENTRY;
2914
2915         if (!cfs_try_module_get(THIS_MODULE)) {
2916                 CERROR("Can't get module. Is it alive?");
2917                 return -EINVAL;
2918         }
2919         switch (cmd) {
2920         case OBD_IOC_LOV_GET_CONFIG: {
2921                 char *buf;
2922                 struct lov_desc *desc;
2923                 struct obd_uuid uuid;
2924
2925                 buf = NULL;
2926                 len = 0;
2927                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2928                         GOTO(out, err = -EINVAL);
2929
2930                 data = (struct obd_ioctl_data *)buf;
2931
2932                 if (sizeof(*desc) > data->ioc_inllen1) {
2933                         obd_ioctl_freedata(buf, len);
2934                         GOTO(out, err = -EINVAL);
2935                 }
2936
2937                 if (data->ioc_inllen2 < sizeof(uuid)) {
2938                         obd_ioctl_freedata(buf, len);
2939                         GOTO(out, err = -EINVAL);
2940                 }
2941
2942                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2943                 desc->ld_tgt_count = 1;
2944                 desc->ld_active_tgt_count = 1;
2945                 desc->ld_default_stripe_count = 1;
2946                 desc->ld_default_stripe_size = 0;
2947                 desc->ld_default_stripe_offset = 0;
2948                 desc->ld_pattern = 0;
2949                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2950
2951                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2952
2953                 err = cfs_copy_to_user((void *)uarg, buf, len);
2954                 if (err)
2955                         err = -EFAULT;
2956                 obd_ioctl_freedata(buf, len);
2957                 GOTO(out, err);
2958         }
2959         case LL_IOC_LOV_SETSTRIPE:
2960                 err = obd_alloc_memmd(exp, karg);
2961                 if (err > 0)
2962                         err = 0;
2963                 GOTO(out, err);
2964         case LL_IOC_LOV_GETSTRIPE:
2965                 err = osc_getstripe(karg, uarg);
2966                 GOTO(out, err);
2967         case OBD_IOC_CLIENT_RECOVER:
2968                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2969                                             data->ioc_inlbuf1, 0);
2970                 if (err > 0)
2971                         err = 0;
2972                 GOTO(out, err);
2973         case IOC_OSC_SET_ACTIVE:
2974                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2975                                                data->ioc_offset);
2976                 GOTO(out, err);
2977         case OBD_IOC_POLL_QUOTACHECK:
2978                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2979                 GOTO(out, err);
2980         case OBD_IOC_PING_TARGET:
2981                 err = ptlrpc_obd_ping(obd);
2982                 GOTO(out, err);
2983         default:
2984                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2985                        cmd, cfs_curproc_comm());
2986                 GOTO(out, err = -ENOTTY);
2987         }
2988 out:
2989         cfs_module_put(THIS_MODULE);
2990         return err;
2991 }
2992
2993 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2994                         obd_count keylen, void *key, __u32 *vallen, void *val,
2995                         struct lov_stripe_md *lsm)
2996 {
2997         ENTRY;
2998         if (!vallen || !val)
2999                 RETURN(-EFAULT);
3000
3001         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3002                 __u32 *stripe = val;
3003                 *vallen = sizeof(*stripe);
3004                 *stripe = 0;
3005                 RETURN(0);
3006         } else if (KEY_IS(KEY_LAST_ID)) {
3007                 struct ptlrpc_request *req;
3008                 obd_id                *reply;
3009                 char                  *tmp;
3010                 int                    rc;
3011
3012                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3013                                            &RQF_OST_GET_INFO_LAST_ID);
3014                 if (req == NULL)
3015                         RETURN(-ENOMEM);
3016
3017                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3018                                      RCL_CLIENT, keylen);
3019                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3020                 if (rc) {
3021                         ptlrpc_request_free(req);
3022                         RETURN(rc);
3023                 }
3024
3025                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3026                 memcpy(tmp, key, keylen);
3027
3028                 req->rq_no_delay = req->rq_no_resend = 1;
3029                 ptlrpc_request_set_replen(req);
3030                 rc = ptlrpc_queue_wait(req);
3031                 if (rc)
3032                         GOTO(out, rc);
3033
3034                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3035                 if (reply == NULL)
3036                         GOTO(out, rc = -EPROTO);
3037
3038                 *((obd_id *)val) = *reply;
3039         out:
3040                 ptlrpc_req_finished(req);
3041                 RETURN(rc);
3042         } else if (KEY_IS(KEY_FIEMAP)) {
3043                 struct ptlrpc_request *req;
3044                 struct ll_user_fiemap *reply;
3045                 char *tmp;
3046                 int rc;
3047
3048                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3049                                            &RQF_OST_GET_INFO_FIEMAP);
3050                 if (req == NULL)
3051                         RETURN(-ENOMEM);
3052
3053                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3054                                      RCL_CLIENT, keylen);
3055                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3056                                      RCL_CLIENT, *vallen);
3057                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3058                                      RCL_SERVER, *vallen);
3059
3060                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3061                 if (rc) {
3062                         ptlrpc_request_free(req);
3063                         RETURN(rc);
3064                 }
3065
3066                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3067                 memcpy(tmp, key, keylen);
3068                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3069                 memcpy(tmp, val, *vallen);
3070
3071                 ptlrpc_request_set_replen(req);
3072                 rc = ptlrpc_queue_wait(req);
3073                 if (rc)
3074                         GOTO(out1, rc);
3075
3076                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3077                 if (reply == NULL)
3078                         GOTO(out1, rc = -EPROTO);
3079
3080                 memcpy(val, reply, *vallen);
3081         out1:
3082                 ptlrpc_req_finished(req);
3083
3084                 RETURN(rc);
3085         }
3086
3087         RETURN(-EINVAL);
3088 }
3089
3090 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3091                               obd_count keylen, void *key, obd_count vallen,
3092                               void *val, struct ptlrpc_request_set *set)
3093 {
3094         struct ptlrpc_request *req;
3095         struct obd_device     *obd = exp->exp_obd;
3096         struct obd_import     *imp = class_exp2cliimp(exp);
3097         char                  *tmp;
3098         int                    rc;
3099         ENTRY;
3100
3101         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3102
3103         if (KEY_IS(KEY_CHECKSUM)) {
3104                 if (vallen != sizeof(int))
3105                         RETURN(-EINVAL);
3106                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3107                 RETURN(0);
3108         }
3109
3110         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3111                 sptlrpc_conf_client_adapt(obd);
3112                 RETURN(0);
3113         }
3114
3115         if (KEY_IS(KEY_FLUSH_CTX)) {
3116                 sptlrpc_import_flush_my_ctx(imp);
3117                 RETURN(0);
3118         }
3119
3120         if (KEY_IS(KEY_CACHE_SET)) {
3121                 struct client_obd *cli = &obd->u.cli;
3122
3123                 LASSERT(cli->cl_cache == NULL); /* only once */
3124                 cli->cl_cache = (struct cl_client_cache *)val;
3125                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3126                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3127
3128                 /* add this osc into entity list */
3129                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3130                 cfs_spin_lock(&cli->cl_cache->ccc_lru_lock);
3131                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3132                 cfs_spin_unlock(&cli->cl_cache->ccc_lru_lock);
3133
3134                 RETURN(0);
3135         }
3136
3137         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3138                 struct client_obd *cli = &obd->u.cli;
3139                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3140                 int target = *(int *)val;
3141
3142                 nr = osc_lru_shrink(cli, min(nr, target));
3143                 *(int *)val -= nr;
3144                 RETURN(0);
3145         }
3146
3147         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3148                 RETURN(-EINVAL);
3149
3150         /* We pass all other commands directly to OST. Since nobody calls osc
3151            methods directly and everybody is supposed to go through LOV, we
3152            assume lov checked invalid values for us.
3153            The only recognised values so far are evict_by_nid and mds_conn.
3154            Even if something bad goes through, we'd get a -EINVAL from OST
3155            anyway. */
3156
3157         if (KEY_IS(KEY_GRANT_SHRINK))
3158                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3159         else
3160                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3161
3162         if (req == NULL)
3163                 RETURN(-ENOMEM);
3164
3165         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3166                              RCL_CLIENT, keylen);
3167         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3168                              RCL_CLIENT, vallen);
3169         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3170         if (rc) {
3171                 ptlrpc_request_free(req);
3172                 RETURN(rc);
3173         }
3174
3175         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3176         memcpy(tmp, key, keylen);
3177         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3178         memcpy(tmp, val, vallen);
3179
3180         if (KEY_IS(KEY_GRANT_SHRINK)) {
3181                 struct osc_grant_args *aa;
3182                 struct obdo *oa;
3183
3184                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3185                 aa = ptlrpc_req_async_args(req);
3186                 OBDO_ALLOC(oa);
3187                 if (!oa) {
3188                         ptlrpc_req_finished(req);
3189                         RETURN(-ENOMEM);
3190                 }
3191                 *oa = ((struct ost_body *)val)->oa;
3192                 aa->aa_oa = oa;
3193                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3194         }
3195
3196         ptlrpc_request_set_replen(req);
3197         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3198                 LASSERT(set != NULL);
3199                 ptlrpc_set_add_req(set, req);
3200                 ptlrpc_check_set(NULL, set);
3201         } else
3202                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3203
3204         RETURN(0);
3205 }
3206
3207
3208 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3209                          struct obd_device *disk_obd, int *index)
3210 {
3211         /* this code is not supposed to be used with LOD/OSP
3212          * to be removed soon */
3213         LBUG();
3214         return 0;
3215 }
3216
3217 static int osc_llog_finish(struct obd_device *obd, int count)
3218 {
3219         struct llog_ctxt *ctxt;
3220
3221         ENTRY;
3222
3223         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3224         if (ctxt) {
3225                 llog_cat_close(NULL, ctxt->loc_handle);
3226                 llog_cleanup(NULL, ctxt);
3227         }
3228
3229         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3230         if (ctxt)
3231                 llog_cleanup(NULL, ctxt);
3232         RETURN(0);
3233 }
3234
3235 static int osc_reconnect(const struct lu_env *env,
3236                          struct obd_export *exp, struct obd_device *obd,
3237                          struct obd_uuid *cluuid,
3238                          struct obd_connect_data *data,
3239                          void *localdata)
3240 {
3241         struct client_obd *cli = &obd->u.cli;
3242
3243         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3244                 long lost_grant;
3245
3246                 client_obd_list_lock(&cli->cl_loi_list_lock);
3247                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3248                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3249                 lost_grant = cli->cl_lost_grant;
3250                 cli->cl_lost_grant = 0;
3251                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3252
3253                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3254                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3255                        data->ocd_version, data->ocd_grant, lost_grant);
3256         }
3257
3258         RETURN(0);
3259 }
3260
3261 static int osc_disconnect(struct obd_export *exp)
3262 {
3263         struct obd_device *obd = class_exp2obd(exp);
3264         struct llog_ctxt  *ctxt;
3265         int rc;
3266
3267         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3268         if (ctxt) {
3269                 if (obd->u.cli.cl_conn_count == 1) {
3270                         /* Flush any remaining cancel messages out to the
3271                          * target */
3272                         llog_sync(ctxt, exp, 0);
3273                 }
3274                 llog_ctxt_put(ctxt);
3275         } else {
3276                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3277                        obd);
3278         }
3279
3280         rc = client_disconnect_export(exp);
3281         /**
3282          * Initially we put del_shrink_grant before disconnect_export, but it
3283          * causes the following problem if setup (connect) and cleanup
3284          * (disconnect) are tangled together.
3285          *      connect p1                     disconnect p2
3286          *   ptlrpc_connect_import
3287          *     ...............               class_manual_cleanup
3288          *                                     osc_disconnect
3289          *                                     del_shrink_grant
3290          *   ptlrpc_connect_interrupt
3291          *     init_grant_shrink
3292          *   add this client to shrink list
3293          *                                      cleanup_osc
3294          * Bang! pinger trigger the shrink.
3295          * So the osc should be disconnected from the shrink list, after we
3296          * are sure the import has been destroyed. BUG18662
3297          */
3298         if (obd->u.cli.cl_import == NULL)
3299                 osc_del_shrink_grant(&obd->u.cli);
3300         return rc;
3301 }
3302
3303 static int osc_import_event(struct obd_device *obd,
3304                             struct obd_import *imp,
3305                             enum obd_import_event event)
3306 {
3307         struct client_obd *cli;
3308         int rc = 0;
3309
3310         ENTRY;
3311         LASSERT(imp->imp_obd == obd);
3312
3313         switch (event) {
3314         case IMP_EVENT_DISCON: {
3315                 cli = &obd->u.cli;
3316                 client_obd_list_lock(&cli->cl_loi_list_lock);
3317                 cli->cl_avail_grant = 0;
3318                 cli->cl_lost_grant = 0;
3319                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3320                 break;
3321         }
3322         case IMP_EVENT_INACTIVE: {
3323                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3324                 break;
3325         }
3326         case IMP_EVENT_INVALIDATE: {
3327                 struct ldlm_namespace *ns = obd->obd_namespace;
3328                 struct lu_env         *env;
3329                 int                    refcheck;
3330
3331                 env = cl_env_get(&refcheck);
3332                 if (!IS_ERR(env)) {
3333                         /* Reset grants */
3334                         cli = &obd->u.cli;
3335                         /* all pages go to failing rpcs due to the invalid
3336                          * import */
3337                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3338
3339                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3340                         cl_env_put(env, &refcheck);
3341                 } else
3342                         rc = PTR_ERR(env);
3343                 break;
3344         }
3345         case IMP_EVENT_ACTIVE: {
3346                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3347                 break;
3348         }
3349         case IMP_EVENT_OCD: {
3350                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3351
3352                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3353                         osc_init_grant(&obd->u.cli, ocd);
3354
3355                 /* See bug 7198 */
3356                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3357                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3358
3359                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3360                 break;
3361         }
3362         case IMP_EVENT_DEACTIVATE: {
3363                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3364                 break;
3365         }
3366         case IMP_EVENT_ACTIVATE: {
3367                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3368                 break;
3369         }
3370         default:
3371                 CERROR("Unknown import event %d\n", event);
3372                 LBUG();
3373         }
3374         RETURN(rc);
3375 }
3376
3377 /**
3378  * Determine whether the lock can be canceled before replaying the lock
3379  * during recovery, see bug16774 for detailed information.
3380  *
3381  * \retval zero the lock can't be canceled
3382  * \retval other ok to cancel
3383  */
3384 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3385 {
3386         check_res_locked(lock->l_resource);
3387
3388         /*
3389          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3390          *
3391          * XXX as a future improvement, we can also cancel unused write lock
3392          * if it doesn't have dirty data and active mmaps.
3393          */
3394         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3395             (lock->l_granted_mode == LCK_PR ||
3396              lock->l_granted_mode == LCK_CR) &&
3397             (osc_dlm_lock_pageref(lock) == 0))
3398                 RETURN(1);
3399
3400         RETURN(0);
3401 }
3402
3403 static int brw_queue_work(const struct lu_env *env, void *data)
3404 {
3405         struct client_obd *cli = data;
3406
3407         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3408
3409         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3410         RETURN(0);
3411 }
3412
3413 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3414 {
3415         struct lprocfs_static_vars lvars = { 0 };
3416         struct client_obd          *cli = &obd->u.cli;
3417         void                       *handler;
3418         int                        rc;
3419         ENTRY;
3420
3421         rc = ptlrpcd_addref();
3422         if (rc)
3423                 RETURN(rc);
3424
3425         rc = client_obd_setup(obd, lcfg);
3426         if (rc)
3427                 GOTO(out_ptlrpcd, rc);
3428
3429         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3430         if (IS_ERR(handler))
3431                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3432         cli->cl_writeback_work = handler;
3433
3434         rc = osc_quota_setup(obd);
3435         if (rc)
3436                 GOTO(out_ptlrpcd_work, rc);
3437
3438         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3439         lprocfs_osc_init_vars(&lvars);
3440         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3441                 lproc_osc_attach_seqstat(obd);
3442                 sptlrpc_lprocfs_cliobd_attach(obd);
3443                 ptlrpc_lprocfs_register_obd(obd);
3444         }
3445
3446         /* We need to allocate a few requests more, because
3447          * brw_interpret tries to create new requests before freeing
3448          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3449          * reserved, but I'm afraid that might be too much wasted RAM
3450          * in fact, so 2 is just my guess and still should work. */
3451         cli->cl_import->imp_rq_pool =
3452                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3453                                     OST_MAXREQSIZE,
3454                                     ptlrpc_add_rqs_to_pool);
3455
3456         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3457         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3458         RETURN(rc);
3459
3460 out_ptlrpcd_work:
3461         ptlrpcd_destroy_work(handler);
3462 out_client_setup:
3463         client_obd_cleanup(obd);
3464 out_ptlrpcd:
3465         ptlrpcd_decref();
3466         RETURN(rc);
3467 }
3468
3469 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3470 {
3471         int rc = 0;
3472         ENTRY;
3473
3474         switch (stage) {
3475         case OBD_CLEANUP_EARLY: {
3476                 struct obd_import *imp;
3477                 imp = obd->u.cli.cl_import;
3478                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3479                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3480                 ptlrpc_deactivate_import(imp);
3481                 cfs_spin_lock(&imp->imp_lock);
3482                 imp->imp_pingable = 0;
3483                 cfs_spin_unlock(&imp->imp_lock);
3484                 break;
3485         }
3486         case OBD_CLEANUP_EXPORTS: {
3487                 struct client_obd *cli = &obd->u.cli;
3488                 /* LU-464
3489                  * for echo client, export may be on zombie list, wait for
3490                  * zombie thread to cull it, because cli.cl_import will be
3491                  * cleared in client_disconnect_export():
3492                  *   class_export_destroy() -> obd_cleanup() ->
3493                  *   echo_device_free() -> echo_client_cleanup() ->
3494                  *   obd_disconnect() -> osc_disconnect() ->
3495                  *   client_disconnect_export()
3496                  */
3497                 obd_zombie_barrier();
3498                 if (cli->cl_writeback_work) {
3499                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3500                         cli->cl_writeback_work = NULL;
3501                 }
3502                 obd_cleanup_client_import(obd);
3503                 ptlrpc_lprocfs_unregister_obd(obd);
3504                 lprocfs_obd_cleanup(obd);
3505                 rc = obd_llog_finish(obd, 0);
3506                 if (rc != 0)
3507                         CERROR("failed to cleanup llogging subsystems\n");
3508                 break;
3509                 }
3510         }
3511         RETURN(rc);
3512 }
3513
3514 int osc_cleanup(struct obd_device *obd)
3515 {
3516         struct client_obd *cli = &obd->u.cli;
3517         int rc;
3518
3519         ENTRY;
3520
3521         /* lru cleanup */
3522         if (cli->cl_cache != NULL) {
3523                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3524                 cfs_spin_lock(&cli->cl_cache->ccc_lru_lock);
3525                 cfs_list_del_init(&cli->cl_lru_osc);
3526                 cfs_spin_unlock(&cli->cl_cache->ccc_lru_lock);
3527                 cli->cl_lru_left = NULL;
3528                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3529                 cli->cl_cache = NULL;
3530         }
3531
3532         /* free memory of osc quota cache */
3533         osc_quota_cleanup(obd);
3534
3535         rc = client_obd_cleanup(obd);
3536
3537         ptlrpcd_decref();
3538         RETURN(rc);
3539 }
3540
3541 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3542 {
3543         struct lprocfs_static_vars lvars = { 0 };
3544         int rc = 0;
3545
3546         lprocfs_osc_init_vars(&lvars);
3547
3548         switch (lcfg->lcfg_command) {
3549         default:
3550                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3551                                               lcfg, obd);
3552                 if (rc > 0)
3553                         rc = 0;
3554                 break;
3555         }
3556
3557         return(rc);
3558 }
3559
3560 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3561 {
3562         return osc_process_config_base(obd, buf);
3563 }
3564
3565 struct obd_ops osc_obd_ops = {
3566         .o_owner                = THIS_MODULE,
3567         .o_setup                = osc_setup,
3568         .o_precleanup           = osc_precleanup,
3569         .o_cleanup              = osc_cleanup,
3570         .o_add_conn             = client_import_add_conn,
3571         .o_del_conn             = client_import_del_conn,
3572         .o_connect              = client_connect_import,
3573         .o_reconnect            = osc_reconnect,
3574         .o_disconnect           = osc_disconnect,
3575         .o_statfs               = osc_statfs,
3576         .o_statfs_async         = osc_statfs_async,
3577         .o_packmd               = osc_packmd,
3578         .o_unpackmd             = osc_unpackmd,
3579         .o_create               = osc_create,
3580         .o_destroy              = osc_destroy,
3581         .o_getattr              = osc_getattr,
3582         .o_getattr_async        = osc_getattr_async,
3583         .o_setattr              = osc_setattr,
3584         .o_setattr_async        = osc_setattr_async,
3585         .o_brw                  = osc_brw,
3586         .o_punch                = osc_punch,
3587         .o_sync                 = osc_sync,
3588         .o_enqueue              = osc_enqueue,
3589         .o_change_cbdata        = osc_change_cbdata,
3590         .o_find_cbdata          = osc_find_cbdata,
3591         .o_cancel               = osc_cancel,
3592         .o_cancel_unused        = osc_cancel_unused,
3593         .o_iocontrol            = osc_iocontrol,
3594         .o_get_info             = osc_get_info,
3595         .o_set_info_async       = osc_set_info_async,
3596         .o_import_event         = osc_import_event,
3597         .o_llog_init            = osc_llog_init,
3598         .o_llog_finish          = osc_llog_finish,
3599         .o_process_config       = osc_process_config,
3600         .o_quotactl             = osc_quotactl,
3601         .o_quotacheck           = osc_quotacheck,
3602 };
3603
3604 extern struct lu_kmem_descr osc_caches[];
3605 extern cfs_spinlock_t       osc_ast_guard;
3606 extern cfs_lock_class_key_t osc_ast_guard_class;
3607
3608 int __init osc_init(void)
3609 {
3610         struct lprocfs_static_vars lvars = { 0 };
3611         int rc;
3612         ENTRY;
3613
3614         /* print an address of _any_ initialized kernel symbol from this
3615          * module, to allow debugging with gdb that doesn't support data
3616          * symbols from modules.*/
3617         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3618
3619         rc = lu_kmem_init(osc_caches);
3620
3621         lprocfs_osc_init_vars(&lvars);
3622
3623         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3624                                  LUSTRE_OSC_NAME, &osc_device_type);
3625         if (rc) {
3626                 lu_kmem_fini(osc_caches);
3627                 RETURN(rc);
3628         }
3629
3630         cfs_spin_lock_init(&osc_ast_guard);
3631         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3632
3633         RETURN(rc);
3634 }
3635
3636 #ifdef __KERNEL__
3637 static void /*__exit*/ osc_exit(void)
3638 {
3639         class_unregister_type(LUSTRE_OSC_NAME);
3640         lu_kmem_fini(osc_caches);
3641 }
3642
3643 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3644 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3645 MODULE_LICENSE("GPL");
3646
3647 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3648 #endif