Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

"""Base class for types.""" 

from __future__ import absolute_import, print_function, unicode_literals 

 

import magic 

import logging 

import os 

import sys 

 

from xtract.utils import run, delete_path 

 

 

logger = logging.getLogger(__name__) 

 

 

if sys.version_info[0] < 3: 

FileExistsError = OSError 

 

 

class BaseFileType(object): 

"""Archive type base class.""" 

 

#: Name for this archive type 

name = NotImplemented 

 

#: List of possible file suffix. 

#: The first entry is considered default and used when packing new archives 

extensions = [] 

 

#: Suffix that will be appended to extracted filename if original 

#: file has none 

unknown_suffix = '-xtract' 

 

#: List of possible mime types 

mimetypes = [] 

 

def __repr__(self): 

"""Return the objects python representation.""" 

return '<{} {}>'.format(self.__class__.__name__, self.name) 

 

def __str__(self): 

"""Return the objects string representation.""" 

return str(self.name) 

 

def get_extensions(self): 

"""Make sure extensions start with a dot and yield them.""" 

for extension in self.extensions: 

47 ↛ 49line 47 didn't jump to line 49, because the condition on line 47 was never false if not extension.startswith('.'): 

extension = '.' + extension 

yield extension 

 

@property 

def default_extension(self): 

"""Return the first extension as default.""" 

return next(self.get_extensions()) 

 

def check_extension(self, path): 

"""Check whether the path ends with an archive suffix.""" 

for extension in self.get_extensions(): 

if path.endswith(extension): 

return extension 

 

def remove_extension(self, path): 

"""Remove the extension from the given path and return it.""" 

extension = self.check_extension(path) 

if extension is None: 

return 

return path[:-len(extension)] 

 

def has_mimetype(self, mimetype): 

return mimetype in self.mimetypes 

 

def check_mimetype(self, path): 

mime = magic.from_file(path, mime=True) 

return self.has_mimetype(mime) 

 

def clean_destination(self, destination, overwrite): 

if not os.path.exists(destination): 

return 

if not overwrite: 

raise FileExistsError(destination) 

delete_path(destination) 

 

def run_cmd(self, cmd): 

cwd = None 

if isinstance(cmd, tuple): 

cmd, cwd = cmd 

run(cmd, cwd) 

 

########################################################################## 

# xtract 

 

def get_xtract_default_destination(self, source): 

destination = self.remove_extension(source) 

if destination is None: 

destination = source + self.unknown_suffix 

return destination 

 

def get_xtract_destination(self, source, destination=None): 

if destination is None: 

return self.get_xtract_default_destination(source) 

if not os.path.isdir(destination): 

return destination 

basename = os.path.basename(self.get_xtract_default_destination(source)) 

return os.path.join(destination, basename) 

 

def xtract(self, source, destination=None, overwrite=False, delete_source=False): 

destination = self.get_xtract_destination(source, destination) 

self.clean_destination(destination, overwrite) 

 

self.pre_xtract(source, destination) 

 

cmd = self.get_xtract_command(source, destination) 

self.run_cmd(cmd) 

 

if delete_source: 

delete_path(source) 

 

return destination 

 

def pre_xtract(self, source, destination): 

pass 

 

def get_xtract_command(self, source, destination): 

raise NotImplementedError 

 

########################################################################## 

# insert 

 

def get_insert_default_destination(self, source): 

return source + self.default_extension 

 

def get_insert_destination(self, source, destination=None): 

if destination is None: 

return self.get_insert_default_destination(source) 

if not os.path.isdir(destination): 

if not destination.endswith(self.default_extension): 

destination += self.default_extension 

return destination 

basename = os.path.basename(self.get_insert_default_destination(source)) 

return os.path.join(destination, basename) 

 

 

class BaseArchiveType(BaseFileType): 

 

def unpack(self, source, destination=None, overwrite=False): 

return self.xtract(source, destination=destination, overwrite=overwrite) 

 

def pre_unpack(self, source, destination): 

try: 

os.makedirs(destination) 

except OSError: 

pass 

 

def pre_xtract(self, *args, **kwargs): 

return self.pre_unpack(*args, **kwargs) 

 

def get_unpack_command(self, source, destination): 

raise NotImplementedError 

 

def get_xtract_command(self, *args, **kwargs): 

return self.get_unpack_command(*args, **kwargs) 

 

def pack(self, source_root, source_files=None, destination=None, overwrite=False, delete_source=False): 

original_source_root = source_root 

if source_files is None: 

source_root = os.path.dirname(source_root) 

source_files = os.path.basename(original_source_root) 

if destination is None: 

destination = source_root 

destination = self.get_insert_destination(original_source_root, destination) 

 

source_files = self.get_source_files(source_root, source_files) 

 

self.clean_destination(destination, overwrite) 

 

self.pre_pack(source_root, source_files, destination) 

 

cmd = self.get_pack_command(source_root, source_files, destination) 

self.run_cmd(cmd) 

 

181 ↛ 182line 181 didn't jump to line 182, because the condition on line 181 was never true if delete_source: 

delete_path(original_source_root) 

 

return destination 

 

def get_source_files(self, source_root, source_files): 

if not isinstance(source_files, (tuple, list)): 

source_files = [source_files] 

for path in source_files: 

190 ↛ 191line 190 didn't jump to line 191, because the condition on line 190 was never true if os.path.isabs(path): 

yield os.path.relpath(path, source_root) 

else: 

yield path 

 

def pre_pack(self, source_root, source_files, destination): 

pass 

 

def get_pack_command(self, source_root, source_files, destination): 

raise NotImplementedError 

 

 

class BaseCompressionType(BaseFileType): 

 

def decompress(self, source, destination=None, overwrite=False): 

return self.xtract(source, destination=destination, overwrite=overwrite) 

 

def pre_decompress(self, source, destination): 

try: 

os.makedirs(os.path.dirname(destination)) 

except OSError: 

pass 

 

def pre_xtract(self, *args, **kwargs): 

return self.pre_decompress(*args, **kwargs) 

 

def get_decompress_command(self, source, destination): 

"""Return command to decompress the given file to the given destination.""" 

return "{} --decompress -c {} > {}".format(self.name, source, destination) 

 

def get_xtract_command(self, *args, **kwargs): 

return self.get_decompress_command(*args, **kwargs) 

 

def compress(self, source, destination=None, overwrite=False, delete_source=False): 

destination = self.get_insert_destination(source, destination) 

 

self.clean_destination(destination, overwrite) 

 

self.pre_compress(source, destination) 

 

cmd = self.get_compress_command(source, destination) 

self.run_cmd(cmd) 

 

233 ↛ 234line 233 didn't jump to line 234, because the condition on line 233 was never true if delete_source: 

delete_path(source) 

 

return destination 

 

def pre_compress(self, source, destination): 

pass 

 

def get_compress_command(self, source, destination): 

"""Return command to compress the given file to the given destination.""" 

path, filename = os.path.split(source) 

return "{} -c {} > {}".format(self.name, filename, destination), path