Compare commits

...

4 Commits

Author SHA1 Message Date
Terekhin Alexander ca5d3b0d03 Zip stream processing for single thread 5 years ago
Terekhin Alexander a7b5c8c36c Update proccessing, remove unnessesary functional code 5 years ago
Terekhin Alexander fd2e8cd199 Update proccessing 5 years ago
Terekhin Alexander 7b26ce92fe Downloaded files processing 5 years ago
  1. 7
      src/main/java/me/bearns/fias/exceptions/DownloadException.java
  2. 11
      src/main/java/me/bearns/fias/exceptions/UnzipException.java
  3. 81
      src/main/java/me/bearns/fias/service/FiasUpdater.java
  4. 7
      src/main/java/me/bearns/fias/service/Updater.java
  5. 164
      src/main/java/me/bearns/fias/util/ZipReader.java
  6. 8
      src/main/java/me/bearns/fias/webapi/API.java

@ -0,0 +1,7 @@
package me.bearns.fias.exceptions;
public class DownloadException extends Exception {
public DownloadException(Exception e) {
super(e);
}
}

@ -0,0 +1,11 @@
package me.bearns.fias.exceptions;
public class UnzipException extends Exception {
public UnzipException(Exception e){
super(e);
}
public UnzipException(String s) {
super(s);
}
}

@ -1,14 +1,20 @@
package me.bearns.fias.service;
import me.bearns.fias.domain.FiasVersion;
import me.bearns.fias.exceptions.DownloadException;
import me.bearns.fias.exceptions.UnzipException;
import me.bearns.fias.repository.FiasVersionRepository;
import me.bearns.fias.util.ZipReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Stream;
@Service
public class FiasUpdater implements Updater {
@ -17,12 +23,13 @@ public class FiasUpdater implements Updater {
FiasVersionRepository versions;
@Autowired
OnlineVersion clientStub;
private static Downloader service;
@Autowired
Downloader downloader;
OnlineVersion clientStub;
@Override
public void update() {
public void update() throws DownloadException, UnzipException {
//Max value
FiasVersion dbVersionObj = versions.findTopByOrderByVersionIdDesc();
@ -48,7 +55,7 @@ public class FiasUpdater implements Updater {
}
@Override
public void reload(Long... regions) {
public void reload(Long... regions) throws DownloadException, UnzipException {
final List<FiasVersion> lastVersion = clientStub.getLastVersion();
if(lastVersion != null) {
@ -58,13 +65,69 @@ public class FiasUpdater implements Updater {
}
//transaction from here
private void processUpdates(List<FiasVersion> updates, boolean reloadFlag, Long ... regions) {
private void processUpdates(List<FiasVersion> updates, boolean reloadFlag, Long ... regions) throws DownloadException, UnzipException {
if(reloadFlag) {
//todo reload flag impl
versions.deleteAll();
}
updates.sort((o1, o2) -> Math.toIntExact(o1.getVersionId() - o2.getVersionId()));
//just for test
final Stream<Future<File>> stream = updates.stream().map(u -> downloader.download(reloadFlag ? u.getFiasCompleteXmlUrl() : u.getFiasDeltaXmlUrl()));
Map<FiasVersion, Future<File>> map = new HashMap<>();
//start download
updates.forEach(u -> map.put(u, service.download(reloadFlag ? u.getFiasCompleteXmlUrl() : u.getFiasDeltaXmlUrl())));
//strict order
for (FiasVersion item : updates) {
final File file;
try {
//wait for downloading
file = map.get(item).get();
//process update
processArchive(file, regions);
//apply this version
versions.save(item);
} catch (InterruptedException e) {
e.printStackTrace();
//todo log
throw new DownloadException(e);
} catch (ExecutionException e) {
e.printStackTrace();
//todo log
throw new DownloadException(e);
} catch (FileNotFoundException e) {
throw new UnzipException(e);
}
}
}
private void processArchive(File file, Long ... regions) throws FileNotFoundException, UnzipException {
try(final ZipReader reader = new ZipReader(new FileInputStream(file))){
final Set<String> list = reader.list();
list.forEach(s -> {
try(final InputStream inputStream = reader.read(s)) {
//todo
} catch (UnzipException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (IOException e) {
throw new UnzipException(e);
}
}
}

@ -1,16 +1,19 @@
package me.bearns.fias.service;
import me.bearns.fias.exceptions.DownloadException;
import me.bearns.fias.exceptions.UnzipException;
public interface Updater {
/**
* Run online update for fias database
*/
public void update();
public void update() throws DownloadException, UnzipException;
/**
* Load or reload database for selected regions
* @param regions
*/
public void reload(Long ... regions);
public void reload(Long ... regions) throws DownloadException, UnzipException;
}

@ -0,0 +1,164 @@
package me.bearns.fias.util;
import me.bearns.fias.exceptions.UnzipException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class ZipReader implements Closeable {
private final InputStream in;
private final Set<String> set = new HashSet<>();
private final ReentrantLock mutex = new ReentrantLock();
public ZipReader(InputStream in){
this.in = in;
}
public Set<String> list() throws UnzipException {
if(set.isEmpty()) init();
return set;
}
public InputStream read(String file) throws UnzipException {
if(file == null) throw new UnzipException("Filename in archive cannot be null");
mutex.lock();
InputStream retval = null;
ZipInputStream zis = null;
try{
zis = new ZipInputStream(in);
ZipEntry zipEntry;
while ((zipEntry = zis.getNextEntry()) != null) {
if(file.equals(zipEntry.getName())) {
//TODO
retval = new FileStream(zis);
}
zis.closeEntry();
}
} catch (IOException e) {
throw new UnzipException(e);
} finally {
if(retval == null) {
mutex.unlock();
try {
if(zis!=null) zis.close();
} catch (IOException e) {
//log
}
throw new UnzipException("File not found in archive");
}
}
return retval;
}
private synchronized void init() throws UnzipException {
mutex.lock();
try(ZipInputStream zis = new ZipInputStream(in)){
ZipEntry zipEntry;
while ((zipEntry = zis.getNextEntry()) != null) {
set.add(zipEntry.getName());
zis.closeEntry();
}
} catch (IOException e) {
throw new UnzipException(e);
} finally {
mutex.unlock();
}
}
@Override
public void close() throws IOException {
mutex.unlock();
if(in!=null) {
in.close();
}
}
public class FileStream extends InputStream{
private ZipInputStream zis;
public FileStream(ZipInputStream zis) {
this.zis = zis;
}
@Override
public int read() throws IOException {
return zis.read();
}
@Override
public int read(byte[] b) throws IOException {
return zis.read(b);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return zis.read(b, off, len);
}
@Override
public long skip(long n) throws IOException {
return zis.skip(n);
}
@Override
public int available() throws IOException {
return zis.available();
}
@Override
public void close() throws IOException {
try {
zis.closeEntry();
zis.close();
} finally {
mutex.unlock();
}
}
@Override
public synchronized void mark(int readlimit) {
zis.mark(readlimit);
}
@Override
public synchronized void reset() throws IOException {
zis.reset();
}
@Override
public boolean markSupported() {
return zis.markSupported();
}
}
}

@ -1,5 +1,6 @@
package me.bearns.fias.webapi;
import me.bearns.fias.exceptions.DownloadException;
import me.bearns.fias.service.Updater;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
@ -13,6 +14,11 @@ public class API {
@GetMapping("/update")
void update(){
service.update();
try {
service.update();
} catch (DownloadException e) {
//TODO
e.printStackTrace();
}
}
}

Loading…
Cancel
Save