diff --git a/src/main/java/me/bearns/fias/service/FiasUpdater.java b/src/main/java/me/bearns/fias/service/FiasUpdater.java index 9cf3a88..121d5f3 100644 --- a/src/main/java/me/bearns/fias/service/FiasUpdater.java +++ b/src/main/java/me/bearns/fias/service/FiasUpdater.java @@ -20,13 +20,16 @@ import java.util.zip.ZipFile; public class FiasUpdater implements Updater { @Autowired - FiasVersionRepository versions; + private static FiasVersionRepository versions; @Autowired - private static Downloader service; + private static StreamSaver unmarshaller; @Autowired - OnlineVersion clientStub; + private static Downloader fileService; + + @Autowired + private static OnlineVersion clientStub; @Override public void update() throws CommonException { @@ -77,7 +80,7 @@ public class FiasUpdater implements Updater { Map> map = new HashMap<>(); //start download - updates.forEach(u -> map.put(u, service.download(reloadFlag ? u.getFiasCompleteXmlUrl() : u.getFiasDeltaXmlUrl()))); + updates.forEach(u -> map.put(u, fileService.download(reloadFlag ? u.getFiasCompleteXmlUrl() : u.getFiasDeltaXmlUrl()))); //strict order for (FiasVersion item : updates) { @@ -114,7 +117,11 @@ public class FiasUpdater implements Updater { final ZipEntry entry = entries.nextElement(); - if(check(entry)) processEntry(zipFile.getInputStream(entry), regions); + if(check(entry)) { //todo + try (InputStream is = zipFile.getInputStream(entry)) { + unmarshaller.process(is, null, null); + } + } } } catch (ZipException e) { @@ -125,10 +132,6 @@ public class FiasUpdater implements Updater { } - private void processEntry(InputStream is, Long ... regions){ - //todo - } - private boolean check(ZipEntry entry){ //todo return true; diff --git a/src/main/java/me/bearns/fias/service/StreamSaver.java b/src/main/java/me/bearns/fias/service/StreamSaver.java new file mode 100644 index 0000000..d1d5651 --- /dev/null +++ b/src/main/java/me/bearns/fias/service/StreamSaver.java @@ -0,0 +1,12 @@ +package me.bearns.fias.service; + +import me.bearns.fias.util.CatalogItem; +import org.springframework.data.jpa.repository.JpaRepository; + +import java.io.InputStream; +import java.util.function.Predicate; + +public interface StreamSaver { + + public void process(InputStream is, CatalogItem conf, Predicate filter); +} diff --git a/src/main/java/me/bearns/fias/service/StreamSaverImpl.java b/src/main/java/me/bearns/fias/service/StreamSaverImpl.java new file mode 100644 index 0000000..3300222 --- /dev/null +++ b/src/main/java/me/bearns/fias/service/StreamSaverImpl.java @@ -0,0 +1,98 @@ +package me.bearns.fias.service; + +import me.bearns.fias.util.CatalogItem; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Component; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.Attribute; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; +import java.io.InputStream; +import java.util.Iterator; +import java.util.function.Predicate; + +@Component +public class StreamSaverImpl implements StreamSaver{ + @Override + public void process(InputStream is, CatalogItem conf, Predicate filter) { + + try { + // create xml event reader for input stream + final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance(); + final XMLEventReader xmlEventReader = xmlInputFactory.createXMLEventReader(is); + + + // initialize jaxb + final JAXBContext context = JAXBContext.newInstance(conf.getParentCls()); + final Unmarshaller unmarshaller = context.createUnmarshaller(); + + XMLEvent e = null; + + long countRead=0, countWrite=0, countErrors=0; + + // loop though the xml stream + while (true) { + + if (!((e = xmlEventReader.peek()) != null)) break; + + countRead++; + + // check the event is a Document start element + if (e.isStartElement() && ((StartElement) e).getName().equals(conf.getQName())) { + + Object obj; + try { + // unmarshall the document + obj = unmarshaller.unmarshal(xmlEventReader, conf.getCls()).getValue(); + + } catch (Exception ex) { + countErrors++; + //log.error("Unmarshalling error {} in {}", ex.getMessage(), file.getName()); + + /*final Iterator attributes = ((StartElement) e).getAttributes(); + attributes.forEachRemaining( + (a) -> log.error("Object dump: {} {}", ((Attribute) a).getName(), ((Attribute) a).getValue()) + );*/ + + continue; + } + + //if(countRead%100000==0) log.info("Read {} item from {}", countRead, file.getName()); + + //Region filter + if(filter != null && !filter.test(obj)) continue; + + //Save to repository + conf.getRepository().save(obj); + + /*if(++countWrite%10000==0) { + repository.flush(); + log.info("Write {} items to repository.", countWrite); + }*/ + + } else { + xmlEventReader.next(); + } + } + } catch (XMLStreamException ex) { + ex.printStackTrace(); //createXMLEventReader & peak + } catch (JAXBException ex) { + ex.printStackTrace(); //createUnmarshaller(); + } + + //write to DB from cache + conf.getRepository().flush(); + + /*log.info("SUCSESS: read={}, write={}", countRead, countWrite); + if(countErrors > 0) { + log.warn("Unmarshalling errors suppressed: {}", countErrors); + log.warn("Check data consistency in {}", file.getName()); + }*/ + } +} diff --git a/src/main/java/me/bearns/fias/util/Catalog.java b/src/main/java/me/bearns/fias/util/Catalog.java new file mode 100644 index 0000000..b90e228 --- /dev/null +++ b/src/main/java/me/bearns/fias/util/Catalog.java @@ -0,0 +1,75 @@ +package me.bearns.fias.util; + +import me.bearns.fias.repository.FiasVersionRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Component; + +import javax.xml.namespace.QName; +import java.util.HashMap; + +@Component +public class Catalog { + + private final HashMap prefixMap = new HashMap<>(); + + public Catalog(FiasVersionRepository FIASJPA_REPOSITORY) { + prefixMap.put( + "AS_ADDROBJ_", + new Item( + null, + new QName("","Object"), + null, + null + ) + ); + prefixMap.put( + "AS_HOUSE_", + new Item( + null, + new QName("","House"), + null, + null + ) + ); + } + + public Item getByPrefix(String s){ + return prefixMap.get(s); + } + + public class Item implements CatalogItem{ + private Item(JpaRepository repository, QName qName, Class cls, Class parentCls) { + this.repository = repository; + this.qName = qName; + this.cls = cls; + this.parentCls = parentCls; + } + + private final JpaRepository repository; + private final QName qName; + private final Class cls; + private final Class parentCls; + + + @Override + public JpaRepository getRepository() { + return repository; + } + + @Override + public QName getQName() { + return qName; + } + + @Override + public Class getCls() { + return cls; + } + + @Override + public Class getParentCls() { + return parentCls; + } + } +} diff --git a/src/main/java/me/bearns/fias/util/CatalogItem.java b/src/main/java/me/bearns/fias/util/CatalogItem.java new file mode 100644 index 0000000..c4cf8ed --- /dev/null +++ b/src/main/java/me/bearns/fias/util/CatalogItem.java @@ -0,0 +1,12 @@ +package me.bearns.fias.util; + +import org.springframework.data.jpa.repository.JpaRepository; + +import javax.xml.namespace.QName; + +public interface CatalogItem { + public JpaRepository getRepository(); + public QName getQName(); + public Class getCls(); + public Class getParentCls(); +}