python - unionAll resulting in StackOverflow -
i've made progress own question (how load dataframe python requests stream downloading csv file?) on stackoverflow, i'm receiving stackoverflow error:
import requests import numpy np import pandas pd import sys if sys.version_info[0] < 3: stringio import stringio else: io import stringio pyspark.sql import sqlcontext sqlcontext = sqlcontext(sc) chunk_size = 1024 url = "https://{0}:8443/gateway/default/webhdfs/v1/{1}?op=open".format(host, filepath) r = requests.get(url, auth=(username, password), verify=false, allow_redirects=true, stream=true) df = none curr_line = 1 remainder = '' chunk in r.iter_content(chunk_size): txt = remainder + chunk [lines, remainder] = txt.rsplit('\n', 1) pdf = pd.read_csv(stringio(lines), sep='|', header=none) if df == none: df = sqlcontext.createdataframe(pdf) else: df = df.unionall(sqlcontext.createdataframe(pdf)) print df.count()
the stacktrace here:
--------------------------------------------------------------------------- py4jjavaerror traceback (most recent call last) <ipython-input-4-b3a89df3c7d8> in <module>() 36 df = sqlcontext.createdataframe(pdf) 37 else: ---> 38 df = df.unionall(sqlcontext.createdataframe(pdf)) 39 40 #curr_line = curr_line + 1 /usr/local/src/spark160master/spark/python/pyspark/sql/dataframe.py in unionall(self, other) 993 equivalent `union all` in sql. 994 """ --> 995 return dataframe(self._jdf.unionall(other._jdf), self.sql_ctx) 996 997 @since(1.3) /usr/local/src/spark160master/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 811 answer = self.gateway_client.send_command(command) 812 return_value = get_return_value( --> 813 answer, self.gateway_client, self.target_id, self.name) 814 815 temp_arg in temp_args: /usr/local/src/spark160master/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 43 def deco(*a, **kw): 44 try: ---> 45 return f(*a, **kw) 46 except py4j.protocol.py4jjavaerror e: 47 s = e.java_exception.tostring() /usr/local/src/spark160master/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 306 raise py4jjavaerror( 307 "an error occurred while calling {0}{1}{2}.\n". --> 308 format(target_id, ".", name), value) 309 else: 310 raise py4jerror( py4jjavaerror: error occurred while calling o19563.unionall. : java.lang.stackoverflowerror
i'm not sure how fix this. tips appreciated.
you shouldn't iteratively merge distributed data structures without controlling number of partitions. you'll find complete explanation going on in stackoverflow due long rdd lineage unfortunately dataframes
trickier:
dfs = ... # list of pyspark.sql.dataframe def unionall(*dfs): if not dfs: raise valueerror() first = dfs[0] return df.sql_ctx.createdataframe( df._sc.union([df.rdd df in dfs]), first.schema ) unionall(*dfs)
Comments
Post a Comment