Whamcloud - gitweb
2de2715f6265f0767b8059e7e5db10731dbf45f6
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 void pcc_super_init(struct pcc_super *super)
116 {
117         spin_lock_init(&super->pccs_lock);
118         INIT_LIST_HEAD(&super->pccs_datasets);
119 }
120
121 /**
122  * pcc_dataset_add - Add a Cache policy to control which files need be
123  * cached and where it will be cached.
124  *
125  * @super: superblock of pcc
126  * @pathname: root path of pcc
127  * @id: HSM archive ID
128  * @projid: files with specified project ID will be cached.
129  */
130 static int
131 pcc_dataset_add(struct pcc_super *super, const char *pathname,
132                 __u32 archive_id, __u32 projid)
133 {
134         int rc;
135         struct pcc_dataset *dataset;
136         struct pcc_dataset *tmp;
137         bool found = false;
138
139         OBD_ALLOC_PTR(dataset);
140         if (dataset == NULL)
141                 return -ENOMEM;
142
143         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
144         if (unlikely(rc)) {
145                 OBD_FREE_PTR(dataset);
146                 return rc;
147         }
148         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
149         dataset->pccd_id = archive_id;
150         dataset->pccd_projid = projid;
151         atomic_set(&dataset->pccd_refcount, 1);
152
153         spin_lock(&super->pccs_lock);
154         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
155                 if (tmp->pccd_id == archive_id) {
156                         found = true;
157                         break;
158                 }
159         }
160         if (!found)
161                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
162         spin_unlock(&super->pccs_lock);
163
164         if (found) {
165                 pcc_dataset_put(dataset);
166                 rc = -EEXIST;
167         }
168
169         return rc;
170 }
171
172 struct pcc_dataset *
173 pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
174 {
175         struct pcc_dataset *dataset;
176         struct pcc_dataset *selected = NULL;
177
178         if (projid == 0 && archive_id == 0)
179                 return NULL;
180
181         /*
182          * archive ID is unique in the list, projid might be duplicate,
183          * we just return last added one as first priority.
184          */
185         spin_lock(&super->pccs_lock);
186         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
187                 if (projid && dataset->pccd_projid != projid)
188                         continue;
189                 if (archive_id && dataset->pccd_id != archive_id)
190                         continue;
191                 atomic_inc(&dataset->pccd_refcount);
192                 selected = dataset;
193                 break;
194         }
195         spin_unlock(&super->pccs_lock);
196         if (selected)
197                 CDEBUG(D_CACHE, "matched projid %u, PCC create\n",
198                        selected->pccd_projid);
199         return selected;
200 }
201
202 void
203 pcc_dataset_put(struct pcc_dataset *dataset)
204 {
205         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
206                 path_put(&dataset->pccd_path);
207                 OBD_FREE_PTR(dataset);
208         }
209 }
210
211 static int
212 pcc_dataset_del(struct pcc_super *super, char *pathname)
213 {
214         struct list_head *l, *tmp;
215         struct pcc_dataset *dataset;
216         int rc = -ENOENT;
217
218         spin_lock(&super->pccs_lock);
219         list_for_each_safe(l, tmp, &super->pccs_datasets) {
220                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
221                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
222                         list_del(&dataset->pccd_linkage);
223                         pcc_dataset_put(dataset);
224                         rc = 0;
225                         break;
226                 }
227         }
228         spin_unlock(&super->pccs_lock);
229         return rc;
230 }
231
232 static void
233 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
234 {
235         seq_printf(m, "%s:\n", dataset->pccd_pathname);
236         seq_printf(m, "  rwid: %u\n", dataset->pccd_id);
237         seq_printf(m, "  autocache: projid=%u\n", dataset->pccd_projid);
238 }
239
240 int
241 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
242 {
243         struct pcc_dataset *dataset;
244
245         spin_lock(&super->pccs_lock);
246         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
247                 pcc_dataset_dump(dataset, m);
248         }
249         spin_unlock(&super->pccs_lock);
250         return 0;
251 }
252
253 void pcc_super_fini(struct pcc_super *super)
254 {
255         struct pcc_dataset *dataset, *tmp;
256
257         list_for_each_entry_safe(dataset, tmp,
258                                  &super->pccs_datasets, pccd_linkage) {
259                 list_del(&dataset->pccd_linkage);
260                 pcc_dataset_put(dataset);
261         }
262 }
263
264
265 static bool pathname_is_valid(const char *pathname)
266 {
267         /* Needs to be absolute path */
268         if (pathname == NULL || strlen(pathname) == 0 ||
269             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
270                 return false;
271         return true;
272 }
273
274 static struct pcc_cmd *
275 pcc_cmd_parse(char *buffer, unsigned long count)
276 {
277         static struct pcc_cmd *cmd;
278         char *token;
279         char *val;
280         unsigned long tmp;
281         int rc = 0;
282
283         OBD_ALLOC_PTR(cmd);
284         if (cmd == NULL)
285                 GOTO(out, rc = -ENOMEM);
286
287         /* clear all setting */
288         if (strncmp(buffer, "clear", 5) == 0) {
289                 cmd->pccc_cmd = PCC_CLEAR_ALL;
290                 GOTO(out, rc = 0);
291         }
292
293         val = buffer;
294         token = strsep(&val, " ");
295         if (val == NULL || strlen(val) == 0)
296                 GOTO(out_free_cmd, rc = -EINVAL);
297
298         /* Type of the command */
299         if (strcmp(token, "add") == 0)
300                 cmd->pccc_cmd = PCC_ADD_DATASET;
301         else if (strcmp(token, "del") == 0)
302                 cmd->pccc_cmd = PCC_DEL_DATASET;
303         else
304                 GOTO(out_free_cmd, rc = -EINVAL);
305
306         /* Pathname of the dataset */
307         token = strsep(&val, " ");
308         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
309             !pathname_is_valid(token))
310                 GOTO(out_free_cmd, rc = -EINVAL);
311         cmd->pccc_pathname = token;
312
313         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
314                 /* archive ID */
315                 token = strsep(&val, " ");
316                 if (val == NULL)
317                         GOTO(out_free_cmd, rc = -EINVAL);
318
319                 rc = kstrtoul(token, 10, &tmp);
320                 if (rc != 0)
321                         GOTO(out_free_cmd, rc = -EINVAL);
322                 if (tmp == 0)
323                         GOTO(out_free_cmd, rc = -EINVAL);
324                 cmd->u.pccc_add.pccc_id = tmp;
325
326                 token = val;
327                 rc = kstrtoul(token, 10, &tmp);
328                 if (rc != 0)
329                         GOTO(out_free_cmd, rc = -EINVAL);
330                 if (tmp == 0)
331                         GOTO(out_free_cmd, rc = -EINVAL);
332                 cmd->u.pccc_add.pccc_projid = tmp;
333         }
334
335         goto out;
336 out_free_cmd:
337         OBD_FREE_PTR(cmd);
338 out:
339         if (rc)
340                 cmd = ERR_PTR(rc);
341         return cmd;
342 }
343
344 int pcc_cmd_handle(char *buffer, unsigned long count,
345                    struct pcc_super *super)
346 {
347         int rc = 0;
348         struct pcc_cmd *cmd;
349
350         cmd = pcc_cmd_parse(buffer, count);
351         if (IS_ERR(cmd))
352                 return PTR_ERR(cmd);
353
354         switch (cmd->pccc_cmd) {
355         case PCC_ADD_DATASET:
356                 rc = pcc_dataset_add(super, cmd->pccc_pathname,
357                                       cmd->u.pccc_add.pccc_id,
358                                       cmd->u.pccc_add.pccc_projid);
359                 break;
360         case PCC_DEL_DATASET:
361                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
362                 break;
363         case PCC_CLEAR_ALL:
364                 pcc_super_fini(super);
365                 break;
366         default:
367                 rc = -EINVAL;
368                 break;
369         }
370
371         OBD_FREE_PTR(cmd);
372         return rc;
373 }
374
375 static inline void pcc_inode_lock(struct inode *inode)
376 {
377         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
378 }
379
380 static inline void pcc_inode_unlock(struct inode *inode)
381 {
382         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
383 }
384
385 static void pcc_inode_init(struct pcc_inode *pcci)
386 {
387         atomic_set(&pcci->pcci_refcount, 0);
388         pcci->pcci_type = LU_PCC_NONE;
389 }
390
391 static void pcc_inode_fini(struct pcc_inode *pcci)
392 {
393         path_put(&pcci->pcci_path);
394         pcci->pcci_type = LU_PCC_NONE;
395         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
396 }
397
398 static void pcc_inode_get(struct pcc_inode *pcci)
399 {
400         atomic_inc(&pcci->pcci_refcount);
401 }
402
403 static void pcc_inode_put(struct pcc_inode *pcci)
404 {
405         if (atomic_dec_and_test(&pcci->pcci_refcount))
406                 pcc_inode_fini(pcci);
407 }
408
409 void pcc_inode_free(struct inode *inode)
410 {
411         struct ll_inode_info *lli = ll_i2info(inode);
412         struct pcc_inode *pcci = lli->lli_pcc_inode;
413
414         if (pcci) {
415                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
416                 pcc_inode_put(pcci);
417                 lli->lli_pcc_inode = NULL;
418         }
419 }
420
421 /*
422  * TODO:
423  * As Andreas suggested, we'd better use new layout to
424  * reduce overhead:
425  * (fid->f_oid >> 16 & oxFFFF)/FID
426  */
427 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
428 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
429 {
430         return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
431                         DFID_NOBRACE,
432                         (fid)->f_oid       & 0xFFFF,
433                         (fid)->f_oid >> 16 & 0xFFFF,
434                         (unsigned int)((fid)->f_seq       & 0xFFFF),
435                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
436                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
437                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
438                         PFID(fid));
439 }
440
441 void pcc_file_init(struct pcc_file *pccf)
442 {
443         pccf->pccf_file = NULL;
444         pccf->pccf_type = LU_PCC_NONE;
445 }
446
447 int pcc_file_open(struct inode *inode, struct file *file)
448 {
449         struct pcc_inode *pcci;
450         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
451         struct pcc_file *pccf = &fd->fd_pcc_file;
452         struct file *pcc_file;
453         struct path *path;
454         struct qstr *dname;
455         int rc = 0;
456
457         ENTRY;
458
459         if (!S_ISREG(inode->i_mode))
460                 RETURN(0);
461
462         pcc_inode_lock(inode);
463         pcci = ll_i2pcci(inode);
464         if (!pcci)
465                 GOTO(out_unlock, rc = 0);
466
467         if (atomic_read(&pcci->pcci_refcount) == 0)
468                 GOTO(out_unlock, rc = 0);
469
470         pcc_inode_get(pcci);
471         WARN_ON(pccf->pccf_file);
472
473         path = &pcci->pcci_path;
474         dname = &path->dentry->d_name;
475         CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
476                dname->name);
477 #ifdef HAVE_DENTRY_OPEN_USE_PATH
478         pcc_file = dentry_open(path, file->f_flags, current_cred());
479 #else
480         pcc_file = dentry_open(path->dentry, path->mnt,
481                                file->f_flags, current_cred());
482 #endif
483         if (IS_ERR_OR_NULL(pcc_file)) {
484                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
485                 pcc_inode_put(pcci);
486         } else {
487                 pccf->pccf_file = pcc_file;
488                 pccf->pccf_type = pcci->pcci_type;
489         }
490
491 out_unlock:
492         pcc_inode_unlock(inode);
493         RETURN(rc);
494 }
495
496 void pcc_file_release(struct inode *inode, struct file *file)
497 {
498         struct pcc_inode *pcci;
499         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
500         struct pcc_file *pccf;
501         struct path *path;
502         struct qstr *dname;
503
504         ENTRY;
505
506         if (!S_ISREG(inode->i_mode) || fd == NULL)
507                 RETURN_EXIT;
508
509         pccf = &fd->fd_pcc_file;
510         pcc_inode_lock(inode);
511         if (pccf->pccf_file == NULL)
512                 goto out;
513
514         pcci = ll_i2pcci(inode);
515         LASSERT(pcci);
516         path = &pcci->pcci_path;
517         dname = &path->dentry->d_name;
518         CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
519                dname->name);
520         pcc_inode_put(pcci);
521         fput(pccf->pccf_file);
522         pccf->pccf_file = NULL;
523 out:
524         pcc_inode_unlock(inode);
525 }
526
527 ssize_t pcc_file_read_iter(struct kiocb *iocb,
528                            struct iov_iter *iter, bool *cached)
529 {
530         struct file *file = iocb->ki_filp;
531         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
532         struct pcc_file *pccf = &fd->fd_pcc_file;
533         ssize_t result;
534
535         ENTRY;
536
537         if (pccf->pccf_file == NULL) {
538                 *cached = false;
539                 RETURN(0);
540         }
541         *cached = true;
542         iocb->ki_filp = pccf->pccf_file;
543
544         result = generic_file_read_iter(iocb, iter);
545         iocb->ki_filp = file;
546
547         RETURN(result);
548 }
549
550 static ssize_t
551 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
552 {
553         struct file *file = iocb->ki_filp;
554
555 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
556         return file->f_op->write_iter(iocb, iter);
557 #else
558         struct iovec iov;
559         struct iov_iter i;
560         ssize_t bytes = 0;
561
562         iov_for_each(iov, i, *iter) {
563                 ssize_t res;
564
565                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
566                 if (-EIOCBQUEUED == res)
567                         res = wait_on_sync_kiocb(iocb);
568                 if (res <= 0) {
569                         if (bytes == 0)
570                                 bytes = res;
571                         break;
572                 }
573
574                 bytes += res;
575                 if (res < iov.iov_len)
576                         break;
577         }
578
579         if (bytes > 0)
580                 iov_iter_advance(iter, bytes);
581         return bytes;
582 #endif
583 }
584
585 ssize_t pcc_file_write_iter(struct kiocb *iocb,
586                             struct iov_iter *iter, bool *cached)
587 {
588         struct file *file = iocb->ki_filp;
589         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
590         struct pcc_file *pccf = &fd->fd_pcc_file;
591         ssize_t result;
592
593         ENTRY;
594
595         if (pccf->pccf_file == NULL) {
596                 *cached = false;
597                 RETURN(0);
598         }
599         *cached = true;
600
601         if (pccf->pccf_type != LU_PCC_READWRITE)
602                 RETURN(-EWOULDBLOCK);
603
604         iocb->ki_filp = pccf->pccf_file;
605
606         /* Since __pcc_file_write_iter makes write calls via
607          * the normal vfs interface to the local PCC file system,
608          * the inode lock is not needed.
609          */
610         result = __pcc_file_write_iter(iocb, iter);
611         iocb->ki_filp = file;
612         RETURN(result);
613 }
614
615 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
616                       bool *cached)
617 {
618         int rc;
619         struct pcc_inode *pcci;
620         struct iattr attr2 = *attr;
621         struct dentry *pcc_dentry;
622
623         ENTRY;
624
625         if (!S_ISREG(inode->i_mode)) {
626                 *cached = false;
627                 RETURN(0);
628         }
629
630         pcc_inode_lock(inode);
631         pcci = ll_i2pcci(inode);
632         if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
633                 GOTO(out_unlock, rc = 0);
634
635         *cached = true;
636         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
637                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
638                          ATTR_CTIME);
639         pcc_dentry = pcci->pcci_path.dentry;
640         inode_lock(pcc_dentry->d_inode);
641         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
642         inode_unlock(pcc_dentry->d_inode);
643 out_unlock:
644         pcc_inode_unlock(inode);
645         RETURN(rc);
646 }
647
648 int pcc_inode_getattr(struct inode *inode, bool *cached)
649 {
650         struct ll_inode_info *lli = ll_i2info(inode);
651         struct pcc_inode *pcci;
652         struct kstat stat;
653         s64 atime;
654         s64 mtime;
655         s64 ctime;
656         int rc;
657
658         ENTRY;
659
660         if (!S_ISREG(inode->i_mode)) {
661                 *cached = false;
662                 RETURN(0);
663         }
664
665         pcc_inode_lock(inode);
666         pcci = ll_i2pcci(inode);
667         if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
668                 GOTO(out_unlock, rc = 0);
669
670         *cached = true;
671         rc = ll_vfs_getattr(&pcci->pcci_path, &stat);
672         if (rc)
673                 GOTO(out_unlock, rc);
674
675         ll_inode_size_lock(inode);
676         if (inode->i_atime.tv_sec < lli->lli_atime ||
677             lli->lli_update_atime) {
678                 inode->i_atime.tv_sec = lli->lli_atime;
679                 lli->lli_update_atime = 0;
680         }
681         inode->i_mtime.tv_sec = lli->lli_mtime;
682         inode->i_ctime.tv_sec = lli->lli_ctime;
683
684         atime = inode->i_atime.tv_sec;
685         mtime = inode->i_mtime.tv_sec;
686         ctime = inode->i_ctime.tv_sec;
687
688         if (atime < stat.atime.tv_sec)
689                 atime = stat.atime.tv_sec;
690
691         if (ctime < stat.ctime.tv_sec)
692                 ctime = stat.ctime.tv_sec;
693
694         if (mtime < stat.mtime.tv_sec)
695                 mtime = stat.mtime.tv_sec;
696
697         i_size_write(inode, stat.size);
698         inode->i_blocks = stat.blocks;
699
700         inode->i_atime.tv_sec = atime;
701         inode->i_mtime.tv_sec = mtime;
702         inode->i_ctime.tv_sec = ctime;
703
704         ll_inode_size_unlock(inode);
705
706 out_unlock:
707         pcc_inode_unlock(inode);
708         RETURN(rc);
709 }
710
711 /* Create directory under base if directory does not exist */
712 static struct dentry *
713 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
714 {
715         int rc;
716         struct dentry *dentry;
717         struct inode *dir = base->d_inode;
718
719         inode_lock(dir);
720         dentry = lookup_one_len(name, base, strlen(name));
721         if (IS_ERR(dentry))
722                 goto out;
723
724         if (d_is_positive(dentry))
725                 goto out;
726
727         rc = vfs_mkdir(dir, dentry, mode);
728         if (rc) {
729                 dput(dentry);
730                 dentry = ERR_PTR(rc);
731                 goto out;
732         }
733 out:
734         inode_unlock(dir);
735         return dentry;
736 }
737
738 static struct dentry *
739 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
740 {
741         char *ptr, *entry_name;
742         struct dentry *parent;
743         struct dentry *child = ERR_PTR(-EINVAL);
744
745         ptr = path;
746         while (*ptr == '/')
747                 ptr++;
748
749         entry_name = ptr;
750         parent = dget(root);
751         while ((ptr = strchr(ptr, '/')) != NULL) {
752                 *ptr = '\0';
753                 child = pcc_mkdir(parent, entry_name, mode);
754                 *ptr = '/';
755                 if (IS_ERR(child))
756                         break;
757                 dput(parent);
758                 parent = child;
759                 ptr++;
760                 entry_name = ptr;
761         }
762
763         return child;
764 }
765
766 /* Create file under base. If file already exist, return failure */
767 static struct dentry *
768 pcc_create(struct dentry *base, const char *name, umode_t mode)
769 {
770         int rc;
771         struct dentry *dentry;
772         struct inode *dir = base->d_inode;
773
774         inode_lock(dir);
775         dentry = lookup_one_len(name, base, strlen(name));
776         if (IS_ERR(dentry))
777                 goto out;
778
779         if (d_is_positive(dentry))
780                 goto out;
781
782         rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
783         if (rc) {
784                 dput(dentry);
785                 dentry = ERR_PTR(rc);
786                 goto out;
787         }
788 out:
789         inode_unlock(dir);
790         return dentry;
791 }
792
793 /* Must be called with pcci->pcci_lock held */
794 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
795                                   struct pcc_inode *pcci,
796                                   struct dentry *dentry,
797                                   enum lu_pcc_type type)
798 {
799         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
800         pcci->pcci_path.dentry = dentry;
801         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
802         atomic_set(&pcci->pcci_refcount, 1);
803         pcci->pcci_type = type;
804         pcci->pcci_attr_valid = false;
805 }
806
807 static int __pcc_inode_create(struct pcc_dataset *dataset,
808                               struct lu_fid *fid,
809                               struct dentry **dentry)
810 {
811         char *path;
812         struct dentry *base;
813         struct dentry *child;
814         int rc = 0;
815
816         OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
817         if (path == NULL)
818                 return -ENOMEM;
819
820         pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
821
822         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0700);
823         if (IS_ERR(base)) {
824                 rc = PTR_ERR(base);
825                 GOTO(out, rc);
826         }
827
828         snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
829         child = pcc_create(base, path, 0600);
830         if (IS_ERR(child)) {
831                 rc = PTR_ERR(child);
832                 GOTO(out_base, rc);
833         }
834         *dentry = child;
835
836 out_base:
837         dput(base);
838 out:
839         OBD_FREE(path, MAX_PCC_DATABASE_PATH);
840         return rc;
841 }
842
843 int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid,
844                      struct dentry **pcc_dentry)
845 {
846         return __pcc_inode_create(dataset, fid, pcc_dentry);
847 }
848
849 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
850                           struct dentry *pcc_dentry)
851 {
852         struct ll_inode_info *lli = ll_i2info(inode);
853         struct pcc_inode *pcci;
854
855         ENTRY;
856
857         LASSERT(ll_i2pcci(inode) == NULL);
858         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
859         if (pcci == NULL)
860                 RETURN(-ENOMEM);
861
862         pcc_inode_init(pcci);
863         pcc_inode_lock(inode);
864         pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
865         lli->lli_pcc_inode = pcci;
866         pcc_inode_unlock(inode);
867
868         RETURN(0);
869 }
870
871 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
872                           loff_t *offset)
873 {
874         while (count > 0) {
875                 ssize_t size;
876
877                 size = vfs_write(filp, (const void __user *)buf, count, offset);
878                 if (size < 0)
879                         return size;
880                 count -= size;
881                 buf += size;
882         }
883         return 0;
884 }
885
886 static int pcc_copy_data(struct file *src, struct file *dst)
887 {
888         int rc = 0;
889         ssize_t rc2;
890         mm_segment_t oldfs;
891         loff_t pos, offset = 0;
892         size_t buf_len = 1048576;
893         void *buf;
894
895         ENTRY;
896
897         OBD_ALLOC_LARGE(buf, buf_len);
898         if (buf == NULL)
899                 RETURN(-ENOMEM);
900
901         oldfs = get_fs();
902         set_fs(KERNEL_DS);
903         while (1) {
904                 pos = offset;
905                 rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
906                 if (rc2 < 0)
907                         GOTO(out_fs, rc = rc2);
908                 else if (rc2 == 0)
909                         break;
910
911                 pos = offset;
912                 rc = pcc_filp_write(dst, buf, rc2, &pos);
913                 if (rc < 0)
914                         GOTO(out_fs, rc);
915                 offset += rc2;
916         }
917
918 out_fs:
919         set_fs(oldfs);
920         OBD_FREE_LARGE(buf, buf_len);
921         RETURN(rc);
922 }
923
924 int pcc_readwrite_attach(struct file *file, struct inode *inode,
925                          __u32 archive_id)
926 {
927         struct pcc_dataset *dataset;
928         struct ll_inode_info *lli = ll_i2info(inode);
929         struct pcc_inode *pcci;
930         struct dentry *dentry;
931         struct file *pcc_filp;
932         struct path path;
933         int rc;
934
935         ENTRY;
936
937         pcc_inode_lock(inode);
938         pcci = ll_i2pcci(inode);
939         if (pcci == NULL) {
940                 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
941                 if (pcci == NULL) {
942                         pcc_inode_unlock(inode);
943                         RETURN(-ENOMEM);
944                 }
945
946                 pcc_inode_init(pcci);
947         } else if (atomic_read(&pcci->pcci_refcount) > 0) {
948                 pcc_inode_unlock(inode);
949                 RETURN(-EEXIST);
950         }
951         pcc_inode_unlock(inode);
952
953         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
954                                   archive_id);
955         if (dataset == NULL)
956                 GOTO(out_free_pcci, rc = -ENOENT);
957
958         rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
959         if (rc)
960                 GOTO(out_dataset_put, rc);
961
962         path.mnt = dataset->pccd_path.mnt;
963         path.dentry = dentry;
964 #ifdef HAVE_DENTRY_OPEN_USE_PATH
965         pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
966                                current_cred());
967 #else
968         pcc_filp = dentry_open(path.dentry, path.mnt,
969                                O_TRUNC | O_WRONLY | O_LARGEFILE,
970                                current_cred());
971 #endif
972         if (IS_ERR_OR_NULL(pcc_filp)) {
973                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
974                 GOTO(out_dentry, rc);
975         }
976
977         rc = pcc_copy_data(file, pcc_filp);
978         if (rc)
979                 GOTO(out_fput, rc);
980
981         pcc_inode_lock(inode);
982         if (lli->lli_pcc_inode)
983                 GOTO(out_unlock, rc = -EEXIST);
984         pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
985         lli->lli_pcc_inode = pcci;
986 out_unlock:
987         pcc_inode_unlock(inode);
988 out_fput:
989         fput(pcc_filp);
990 out_dentry:
991         if (rc)
992                 dput(dentry);
993 out_dataset_put:
994         pcc_dataset_put(dataset);
995 out_free_pcci:
996         if (rc)
997                 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
998         RETURN(rc);
999
1000 }
1001
1002 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
1003                               bool lease_broken, int rc, bool attached)
1004 {
1005         struct pcc_inode *pcci = ll_i2pcci(inode);
1006
1007         ENTRY;
1008
1009         if ((rc || lease_broken) && attached && pcci)
1010                 pcc_inode_put(pcci);
1011
1012         RETURN(rc);
1013 }
1014
1015 int pcc_ioctl_detach(struct inode *inode)
1016 {
1017         struct ll_inode_info *lli = ll_i2info(inode);
1018         struct pcc_inode *pcci = lli->lli_pcc_inode;
1019         int rc = 0;
1020         int count;
1021
1022         ENTRY;
1023
1024         pcc_inode_lock(inode);
1025         if (pcci == NULL)
1026                 GOTO(out_unlock, rc = 0);
1027
1028         count = atomic_read(&pcci->pcci_refcount);
1029         if (count > 1)
1030                 GOTO(out_unlock, rc = -EBUSY);
1031         else if (count == 0)
1032                 GOTO(out_unlock, rc = 0);
1033
1034         pcc_inode_put(pcci);
1035         lli->lli_pcc_inode = NULL;
1036 out_unlock:
1037         pcc_inode_unlock(inode);
1038
1039         RETURN(rc);
1040 }
1041
1042 int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state)
1043 {
1044         int rc = 0;
1045         int count;
1046         char *buf;
1047         char *path;
1048         int buf_len = sizeof(state->pccs_path);
1049         struct pcc_inode *pcci;
1050
1051         ENTRY;
1052
1053         if (buf_len <= 0)
1054                 RETURN(-EINVAL);
1055
1056         OBD_ALLOC(buf, buf_len);
1057         if (buf == NULL)
1058                 RETURN(-ENOMEM);
1059
1060         pcc_inode_lock(inode);
1061         pcci = ll_i2pcci(inode);
1062         if (pcci == NULL) {
1063                 state->pccs_type = LU_PCC_NONE;
1064                 GOTO(out_unlock, rc = 0);
1065         }
1066
1067         count = atomic_read(&pcci->pcci_refcount);
1068         if (count == 0) {
1069                 state->pccs_type = LU_PCC_NONE;
1070                 GOTO(out_unlock, rc = 0);
1071         }
1072         state->pccs_type = pcci->pcci_type;
1073         state->pccs_open_count = count - 1;
1074         state->pccs_flags = pcci->pcci_attr_valid ?
1075                             PCC_STATE_FLAG_ATTR_VALID : 0;
1076 #ifdef HAVE_DENTRY_PATH_RAW
1077         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
1078         if (IS_ERR(path))
1079                 GOTO(out_unlock, rc = PTR_ERR(path));
1080 #else
1081         path = "UNKNOWN";
1082 #endif
1083
1084         if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
1085                 GOTO(out_unlock, rc = -ENAMETOOLONG);
1086
1087 out_unlock:
1088         pcc_inode_unlock(inode);
1089         OBD_FREE(buf, buf_len);
1090         RETURN(rc);
1091 }